SegmentFault 空中漫步最新的文章
2021-08-14T22:10:03+08:00
https://segmentfault.com/feeds/blogs
https://creativecommons.org/licenses/by-nc-nd/4.0/
记一次线上频繁GC
https://segmentfault.com/a/1190000040516289
2021-08-14T22:10:03+08:00
2021-08-14T22:10:03+08:00
扑火的蛾
https://segmentfault.com/u/butterflyofzhang
0
<h2>内存泄露Bug现场</h2><p>线上某核心链路服务的一个节点疯狂GC,监控图如下:<br><img src="/img/bVcUajw" alt="" title=""></p><p>平均1分钟触发CMS GC 36次,已无法正常处理线上请求。</p><h2>准备工作</h2><p>发现该节点有问题后,找运维将该节点从服务注册中心上<code>摘掉</code>,因为我们需要去jmap dump服务的堆栈信息,而dump内存会<code>STW</code>,必须先摘流。</p><p>dump命令如下:</p><pre><code>jmap -dump:format=b,file=heap.bin [pid]</code></pre><p>dump好以后gzip压缩,便于文件传输到本地,从2G压缩到300+M左右。</p><h2>分析</h2><p>将dump的文件导入到MAT中,MAT内存分布图如下:<br><img src="/img/bVcUajy" alt="" title=""></p><p>1.2G的内存都被AccountChangeTask中的ConcurrentHashMap对象占用了,那思路就很清晰了,去检查代码中什么地方使用了 这个AccountChangeTask对象。</p><p>AccountChangeTask的整体结构如下:</p><pre><code>@Service
public class AccountChangeTask {
// 缓存SQL和tableName映射关系
private static final Map<String, String> sqlMap = new ConcurrentHashMap<>();
@Async
public void processTask(String sql) {
// 对sqlMap对象的get put操作,key是SQL,value是表名
// 原因是逻辑中有一些对SQL做正则解析的操作,可能比较耗时和耗CPU,所以想通过缓存优化
... 其他业务逻辑
}
}</code></pre><p>我们再去查找使用了AccountChangeTask.processTask()方法的地方,代码如下:</p><pre><code>@Intercepts({
@Signature(type = Executor.class, method = "update", args = { MappedStatement.class, Object.class }),
@Signature(type = Executor.class, method = "query", args = { MappedStatement.class, Object.class,
RowBounds.class, ResultHandler.class }) })
@Component
public class MybatisInterceptor implements Interceptor {
@Autowired
private AccountChangeTask accountChangeTask;
@Override
public Object intercept(Invocation invocation) throws Throwable {
// 省略...
String sql = showSql(config,boundSql) // 填充为真实的SQL,将?填充为真实的SQL参数
// 业务逻辑判断,如果true走下面逻辑
accountChangeTask.processTask(sql);
}
}</code></pre><h2>问题解决</h2><p>分析代码,原因就是缓存在ConcurrentHashMap中的SQL是被参数填充过的SQL,而线上环境的sql参数千变万化,有不同uid和时间等等,请求量一上来就把ConcurrentHashMap撑爆了。</p><p>解决思路其实也很简单:在对性能没有极致要求的情况下,移除代码中对SQL的缓存;而直接走正则逻辑 并且 提前对正则表达式做好编译,可能是更合理的选择。</p><h2>总结</h2><p>在没有极致性能要求的情况下,简化我们的设计,服务可能会更具健壮性。</p>
用MAT定位高负载线程
https://segmentfault.com/a/1190000022555421
2020-05-06T13:59:00+08:00
2020-05-06T13:59:00+08:00
扑火的蛾
https://segmentfault.com/u/butterflyofzhang
1
<h2>背景</h2>
<p>某广告引擎服务的负载很高,之前用top、jstack来抓高负载的线程,发现高负载的线程都是没有自定义名称的,用的还是默认的命名方式,如下:</p>
<pre><code>48385 www-data 20 0 17.0t 216.8g 209.6g S 1.6 172 111:21.51 pool-14-thread-
48389 www-data 20 0 17.0t 216.8g 209.6g S 1.6 172 111:21.88 pool-14-thread-
48391 www-data 20 0 17.0t 216.8g 209.6g S 1.6 172 111:26.01 pool-14-thread-
48393 www-data 20 0 17.0t 216.8g 209.6g S 1.6 172 111:31.96 pool-14-thread-
48405 www-data 20 0 17.0t 216.8g 209.6g S 1.6 172 111:26.11 pool-14-thread-
47510 www-data 20 0 17.0t 216.8g 209.6g S 1.3 172 26:31.58 feature-task-8-
48355 www-data 20 0 17.0t 216.8g 209.6g S 1.3 172 111:02.62 pool-14-thread-
48357 www-data 20 0 17.0t 216.8g 209.6g S 1.3 172 111:01.62 pool-14-thread-
48359 www-data 20 0 17.0t 216.8g 209.6g S 1.3 172 111:04.75 pool-14-thread-
48360 www-data 20 0 17.0t 216.8g 209.6g S 1.3 172 111:08.55 pool-14-thread-
48361 www-data 20 0 17.0t 216.8g 209.6g S 1.3 172 111:06.72 pool-14-thread-
48362 www-data 20 0 17.0t 216.8g 209.6g S 1.3 172 111:02.16 pool-14-thread-
48363 www-data 20 0 17.0t 216.8g 209.6g S 1.3 172 111:07.89 pool-14-thread-
48368 www-data 20 0 17.0t 216.8g 209.6g S 1.3 172 111:09.15 pool-14-thread-
48371 www-data 20 0 17.0t 216.8g 209.6g S 1.3 172 110:59.45 pool-14-thread-
48373 www-data 20 0 17.0t 216.8g 209.6g S 1.3 172 111:23.91 pool-14-thread-
48374 www-data 20 0 17.0t 216.8g 209.6g S 1.3 172 111:19.99 pool-14-thread-
48375 www-data 20 0 17.0t 216.8g 209.6g S 1.3 172 111:16.03 pool-14-thread-</code></pre>
<p>我们目前知道的是,高负载线程叫<code> pool-14-thread-1</code>这种。那只能用MAT分析JVM的dump文件,MAT有分析对象间的引用关系的功能。</p>
<h2>准备工作</h2>
<p>因为生产的机器都是70G内存,会导致dump文件太大,mac没有那么大的内存来分析几十G的内存文件。</p>
<p>所以找了预发环境的机器来dump,命令如下:</p>
<pre><code>jmap -dump:format=b,file=heap.bin [pid]</code></pre>
<p>如果预发环境的机器内存还是太大,只能改-Xms -Xmx了,改成4G左右。</p>
<p>dump好以后gzip做压缩,便于文件传输,从500M压缩到90M左右。</p>
<p>安装lrzsz,通过sz -be 将压缩文件从预发环境下载到本地。(大文件传输很困难,失败了n次)</p>
<p>下载eclipse,下载MAT插件。</p>
<h2>分析</h2>
<p>在MAT的线程视图里找到了<code>pool-14-thread-1</code>,如下图:<br><img src="https://wx2.sinaimg.cn/mw690/9796c2b1ly1geipeh24uoj21qe0ts4qp.jpg" alt="alt text" title="alt text"></p>
<p>点击线程,Incomming References可以查到引用了该对象的对象。如下图:<br><img src="https://wx4.sinaimg.cn/mw690/9796c2b1ly1geipehnn08j21qm0aigsi.jpg" alt="alt text" title="alt text"></p>
<p><img src="https://wx2.sinaimg.cn/mw690/9796c2b1ly1geipei59irj21jo0juk01.jpg" alt="alt text" title="alt text"></p>
<p>通过两次的查询,可以定位到是<code>aa.bb.ccc.hystrix.FallbackExecutor</code>这个对象。</p>
<p>看下该类的源码:</p>
<pre><code>public class FallbackExecutor {
private static final Logger LOG = LoggerFactory.getLogger(FallbackExecutor.class);
// 这里定义线程池,且没有自定义线程名字!!!
private static final ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
public FallbackExecutor() {
}
public static void execute() {
RecoveryStrategy recoveryStrategy = null;
try {
String strategy = ConfigProperties.getProperties().getProperty("recovery.strategy", "aa.bb.xxx.hystrix.DefaultRecoveryStrategy");
recoveryStrategy = (RecoveryStrategy)Class.forName(strategy).newInstance();
LOG.info("instance recoveryStrategy class:{}", recoveryStrategy);
} catch (Exception var4) {
var4.printStackTrace();
LOG.error("recovery.strategy not exist");
}
int interval = Integer.parseInt(ConfigProperties.getProperties().getProperty("recovery.stategy.interval.minutes", "5"));
executor.scheduleWithFixedDelay(recoveryStrategy, (long)interval, (long)interval, TimeUnit.MINUTES);
try {
KafkaSDKFace.storeStrategy = (StoreStrategy)Class.forName(ConfigProperties.getProperties().getProperty("store.strategy", "aa.bb.xxx.hystrix.DefaultStoreStrategy")).newInstance();
LOG.info("instance storeStrategy class:{}", KafkaSDKFace.storeStrategy);
} catch (Exception var3) {
var3.printStackTrace();
LOG.error("store.strategy not exist");
}
executor.scheduleWithFixedDelay(KafkaSDKFace.storeStrategy, (long)interval, (long)interval, TimeUnit.MINUTES);
}
}
</code></pre>
<p>该类主要用于fallback降级。</p>
<p>在repo里查找使用了这个包的地方, <code>xx.yy.zzz.rank.processor.helper.KafkaSdkHelper</code>这个类,代码如下:</p>
<pre><code> public void writeKafka(Object object, String topic, String type) {
try {
if (object != null) {
KafkaSDKFace sdkFace = getKafkaSDKFace(type);
// KafkaSDKFace是那个Jar包里的类!!!
if(sdkFace != null){
Message message = new Message(System.currentTimeMillis(), MapperUtil.MAPPER.writeValueAsString(object));
sdkFace.send(topic, null, message);
}
else {
logger.error("sdkFace is null");
}
}
} catch (Exception e) {
logger.error("kafka error : " + e.getMessage());
logger.error("kafka stack : " + ExceptionUtils.getStackTrace(e));
}
}</code></pre>
<p>到此已经找到了高负载线程的地方</p>
<h2>总结</h2>
<p>在服务中自定义线程池的时候,一定要进行重命名,不要使用默认的线程名生成方式,否则对于后期的问题定位是个极大的干扰。</p>
<h2>原文链接</h2>
<p><a href="https://segmentfault.com/a/1190000022555421">https://segmentfault.com/a/11...</a></p>
Log In Action
https://segmentfault.com/a/1190000019342332
2019-05-30T17:34:41+08:00
2019-05-30T17:34:41+08:00
扑火的蛾
https://segmentfault.com/u/butterflyofzhang
0
<h2>Log In Action</h2>
<h3>0. 生产环境要关闭debug日志,严禁在生产环境打debug级别日志</h3>
<h3>1. trace/debug/info 日志输出采用占位符的方式,禁止使用字符串拼接的方式</h3>
<p>说明:<code>logger.debug("=====" + b)</code>,如果当前的日志级别是warn,上述日志不会打印,但会执行字符串拼接操作,如果b是对象,会调用b的toString()方法,这样会非常浪费系统资源,特别是当b是个大对象的时候。</p>
<p>bad case:</p>
<pre><code>log.info("finish import seller " + sellerId)
# 有现成的占位符的方式,不要使用String.format(),代码复杂
log.info(String.format("group_id=%s",groupId)) </code></pre>
<p>good case:</p>
<pre><code>logger.info("modify audit status, noteId: [{}], creativityId: [{}]", noteId, creativity.getId());</code></pre>
<p>个人也不推荐条件输出形式,用这种if的方式判断,显然不够优雅</p>
<pre><code>if (logger.isDebugEnabled()) {
logger.debug("Processing trade with id: " + id + " and symbol: " + symbol);
}</code></pre>
<h3>2. 使用[]进行参数变量隔离</h3>
<p>这样的格式写法,可读性更好,对于排查问题又帮助。</p>
<p>good case:</p>
<pre><code>logger.info("modify audit status, noteId: [{}], creativityId: [{}]", noteId, creativity.getId())</code></pre>
<h3>3. 使用warn级别日志记录用户输入参数错误的情况,不要使用error级别日志记录此类错误,避免频繁报警</h3>
<p>bad case:</p>
<pre><code>log.error("Porch: 创建账号失败:传入参数有误:email={}, name={}", email, name)</code></pre>
<p>good case:</p>
<pre><code>logger.warn("创建单元名称重复,unit_name={}", req.getUnitName())</code></pre>
<h3>4. 异常信息应该包含两类信息:案发现场和异常堆栈信息</h3>
<p>bad case:</p>
<pre><code>log.error("调用sellerCenter服务异常")</code></pre>
<p>good case:</p>
<pre><code>logger.error("RPC调用[inventory.multi_get_available]失败", e);</code></pre>
<h3>如果抛出异常,不要记录error日志,由上层进行处理</h3>
<p>如果既打错误日志,又抛出异常,会导致错误日志的重复打印。</p>
<p>bad case:</p>
<pre><code> try {
...
} catch (TException e) {
logger.error("RPC调用[item_center.multi_get_item_union]失败", e);
throw new BaseException(ResponseCode.ITEM_SYSTEM_ERROR);
}</code></pre>
<p>good case:</p>
<pre><code> try {
...
} catch (TException e) {
// e必须传递给重新抛出的异常,否则会导致异常栈的丢失
throw new BaseException("xxxx", e);
}</code></pre>
<h3>logback VS log4j2 性能对比</h3>
<pre><code>linux:
8核 2.4Hz
32G 内存
50个线程,每个线程写2W行日志
logback:
100W行日志总计耗时:7419 ms
100W行日志总计耗时:7337 ms
100W行日志总计耗时:7345 ms
100W行日志总计耗时:7263 ms
100W行日志总计耗时:7084 ms
log4j2:
100W行日志总计耗时:3815 ms
100W行日志总计耗时:3904 ms
100W行日志总计耗时:3743 ms
100W行日志总计耗时:3766 ms
100W行日志总计耗时:3755 ms</code></pre>
<h3>总结</h3>
<p>未来是log4j2的。</p>
<h3>原文链接</h3>
<p><a href="https://segmentfault.com/a/1190000019342332">https://segmentfault.com/a/11...</a></p>
记JVM堆外内存泄漏Bug查找
https://segmentfault.com/a/1190000016574457
2018-09-30T15:57:35+08:00
2018-09-30T15:57:35+08:00
扑火的蛾
https://segmentfault.com/u/butterflyofzhang
2
<h2>内存泄漏Bug现场</h2>
<p>一个做<code>BI</code>数据展示的服务在一个晚上重启了5次,由于是通过k8s容器编排,服务挂了以后会自动重启,所以服务还能继续提供服务。</p>
<p>第一时间先上日志系统查看错误日志,发现如下报错:</p>
<pre><code>java.lang.OutOfMemoryError ERROR java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:717)
at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase.start(CloseableHttpAsyncClientBase.java:83)
at org.elasticsearch.client.RestClientBuilder.build(RestClientBuilder.java:190)
at com.xiaohongshu.fls.jbds.handler.JbdsImpl.get_realtime_trend_data_from_es(JbdsImpl.java:637)
at com.xiaohongshu.fls.jbds.handler.JbdsImpl.get_seller_trend(JbdsImpl.java:316)
at com.xiaohongshu.fls.jbds.handler.JbdsImpl$$FastClassBySpringCGLIB$$bd2466f7.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:738)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:85)
at com.xiaohongshu.fls.jbds.aspect.RpcMethodExceptionAspect.requestControllerLog(RpcMethodExceptionAspect.java:33)
at sun.reflect.GeneratedMethodAccessor54.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:629)
at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:618)
at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:673)
at com.xiaohongshu.fls.jbds.handler.JbdsImpl$$EnhancerBySpringCGLIB$$d374b954.get_seller_trend(<generated>)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.xiaohongshu.infra.rpc.core.ThriftServerBaseProxy.call(ThriftServerBaseProxy.java:119)
at com.xiaohongshu.infra.rpc.core.ThriftServerCGlibProxy.intercept(ThriftServerCGlibProxy.java:27)
at com.xiaohongshu.fls.jbds.handler.JbdsImpl$$EnhancerByCGLIB$$19473b8f.get_seller_trend(<generated>)
at com.xiaohongshu.fls.rpc.jbds.JBusinessDataService$Processor$get_seller_trend.getResult(JBusinessDataService.java:1450)
at com.xiaohongshu.fls.rpc.jbds.JBusinessDataService$Processor$get_seller_trend.getResult(JBusinessDataService.java:1435)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.invoke(AbstractNonblockingServer.java:518)
at org.apache.thrift.server.Invocation.run(Invocation.java:18)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) </code></pre>
<p>发现是<code>OOM</code>的错误,并且有<code>unable to create new native thread</code>的错误信息,笔者的第一直觉是创建了大量线程从而导致堆外内存空间不足。</p>
<p>随即去<code>CAT</code>监控系统上去查看线程的活跃情况,如下图:</p>
<p><img src="https://wx1.sinaimg.cn/mw690/9796c2b1gy1fvrjppuvz1j20cs06gwen.jpg" alt="alt text" title="alt text"></p>
<p>发现服务的活跃线程数达到了1W左右,这显然有问题。服务本身是一个<code>Thrift Server</code>,开起的<code>worker</code>线程数为200,再加上一些其他<code>IO</code>线程,总的线程数不会超过300(服务内自己没有显示创建线程或者使用线程池)。</p>
<h2>查找线索</h2>
<p>笔者登录到线上的Docker实例上,通过<code>jmap -histo:live pid</code>命令,查看JVM中存活的对象,输出如下内容:</p>
<pre><code>num #instances #bytes class name
----------------------------------------------
1: 19303 656995672 [B
2: 116670 13942144 [C
3: 15645 5298648 [I
4: 10098 3796848 java.lang.Thread
5: 75817 3639216 java.util.HashMap
6: 113889 2733336 java.lang.String
7: 27521 2421848 java.lang.reflect.Method
8: 68662 2197184 java.util.concurrent.ConcurrentHashMap$Node
9: 10041 1767032 [J
10: 372 1525568 [Ljava.nio.ByteBuffer;
11: 36807 1472280 java.util.LinkedHashMap$Entry
12: 16504 1352488 [Ljava.util.HashMap$Node;
13: 40687 1301984 java.lang.ThreadLocal$ThreadLocalMap$Entry
14: 11495 1277344 java.lang.Class
15: 29903 1196120 java.util.WeakHashMap$Entry
16: 70474 1127584 java.lang.Object
17: 15346 918592 [Ljava.lang.Object;
18: 51556 824896 java.util.HashSet
19: 25528 816896 java.util.HashMap$Node
20: 9989 812176 [Ljava.lang.ThreadLocal$ThreadLocalMap$Entry;
21: 9661 792032 [Ljava.util.WeakHashMap$Entry;
22: 9504 760320 org.apache.http.impl.nio.reactor.BaseIOReactor
23: 30551 733224 java.util.concurrent.ConcurrentLinkedQueue$Node
24: 9914 713808 sun.nio.ch.EPollArrayWrapper
25: 9914 713808 sun.nio.ch.EPollSelectorImpl
26: 29682 712368 java.util.concurrent.ConcurrentLinkedQueue
27: 3525 694672 [Ljava.util.concurrent.ConcurrentHashMap$Node;
28: 15664 501248 java.lang.ref.WeakReference
29: 8364 468384 java.util.LinkedHashMap
30: 9656 463488 java.util.WeakHashMap
31: 20064 436536 [Ljava.lang.Class;
32: 9267 370680 java.security.AccessControlContext
33: 4856 349632 java.lang.reflect.Field
34: 10278 328896 java.lang.ref.ReferenceQueue
35: 9914 317248 sun.nio.ch.AllocatedNativeObject
36: 4955 317120 java.util.concurrent.ConcurrentHashMap
37: 7597 303880 java.lang.ref.SoftReference
38: 9245 293568 [Ljava.security.ProtectionDomain;
39: 11484 275616 java.util.ArrayList
40: 3330 239760 org.springframework.core.annotation.AnnotationAttributes
41: 9989 239736 java.lang.ThreadLocal$ThreadLocalMap
42: 9951 238824 java.util.Collections$SynchronizedSet
43: 9922 238128 java.util.BitSet
44: 9504 228096 org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker
45: 5460 207944 [Ljava.lang.String;
46: 2459 196720 java.lang.reflect.Constructor
47: 12103 193648 java.util.HashMap$KeySet
48: 8003 192072 java.beans.MethodRef
49: 3657 175536 org.aspectj.weaver.reflect.ShadowMatchImpl
50: 10339 165424 java.util.concurrent.atomic.AtomicBoolean
51: 10281 164496 java.lang.ref.ReferenceQueue$Lock
52: 10203 163248 java.util.Collections$UnmodifiableSet
53: 2913 163128 java.beans.MethodDescriptor
54: 2547 161536 [Ljava.lang.reflect.Method;
55: 9914 158624 java.nio.channels.spi.AbstractSelector$1
56: 9913 158608 sun.nio.ch.Util$3
57: 5481 131544 org.springframework.core.MethodClassKey
58: 3707 118624 java.util.LinkedList
59: 3637 116384 org.aspectj.weaver.patterns.ExposedState
60: 2047 114632 java.lang.Class$ReflectionData
61: 1155 110880 org.springframework.beans.GenericTypeAwarePropertyDescriptor
62: 399 109984 [Ljava.lang.Thread;
63: 4387 105288 sun.reflect.generics.tree.SimpleClassTypeSignature
64: 1388 99936 java.beans.PropertyDescriptor
65: 4087 90560 [Ljava.lang.reflect.Type;
66: 5361 85776 org.springframework.core.annotation.AnnotationUtils$DefaultValueHolder
67: 1296 82944 io.netty.buffer.PoolSubpage
68: 4387 82520 [Lsun.reflect.generics.tree.TypeArgument;
69: 674 75488 org.springframework.boot.loader.jar.JarEntry
70: 1848 73920 java.util.TreeMap$Entry
71: 3061 73464 sun.reflect.annotation.AnnotationInvocationHandler
72: 2737 65688 java.util.LinkedList$Node
73: 3989 63824 sun.reflect.generics.tree.ClassTypeSignature
74: 858 61776 org.apache.ibatis.mapping.ResultMapping
75: 832 53248 org.springframework.core.MethodParameter
76: 78 51168 io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueue
77: 2120 50880 java.util.Collections$UnmodifiableRandomAccessList
78: 3165 50640 java.util.LinkedHashMap$LinkedKeySet
79: 2419 49728 [Lsun.reflect.generics.tree.FieldTypeSignature;</code></pre>
<p><img src="https://wx2.sinaimg.cn/mw690/9796c2b1gy1fvrk72nw0sj21kw0ks11v.jpg" alt="alt text" title="alt text"></p>
<p>标红的这个类<code>org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker</code>非常值得怀疑,有9504个实例,从类名看是一个实现了<code>Reactor</code>模式的<code>http</code>客户端,了解<code>Reactor</code>模式的同学都知道,其包含了一个<code>accept</code>线程处理连接事件,<code>N</code>个<code>IO bounding</code>的线程处理<code>read</code>、<code>write</code>事件,和<code>M</code>个<code>worker</code>线程处理业务逻辑。</p>
<h2>进一步查找</h2>
<p>接下来需要查找哪里使用了这个http客户端,在build.gradle的文件里看到依赖了<code>ES</code>的<code>http client</code></p>
<pre><code>group: 'org.elasticsearch.client', name: 'elasticsearch-rest-client', version: '6.3.2'</code></pre>
<p>通过全局搜索,发现了如下的代码:</p>
<pre><code>private String get_realtime_data_from_es (String seller_id) throws IOException{
Date current_date = new Date( );
SimpleDateFormat sdf = new SimpleDateFormat ("yyyyMMdd");
String date=sdf.format(current_date);
RestClient restClient = RestClient.builder(
new HttpHost("xxxxxxxxx", 9200, "http"),
new HttpHost("xxxxxxxxx", 9201, "http")).build();
org.elasticsearch.client.Response response = restClient.performRequest("GET", "/seller/"+date+"/"+seller_id);
restClient.close();
return EntityUtils.toString(response.getEntity());
}</code></pre>
<p>笔者发现这里调用<code>ES</code>的<code>RestClient</code>的非常可疑,居然是每次调用创建一个对象,而不是使用单例模式。</p>
<p>使用过<code>RestTemplate</code>或者<code>HttpClient</code>的同学都知道,<code>Http</code>客户端一般都是通过单例的方式注入到<code>Spring</code>容器中,客户端都是线程安全的,多线程情况下不会出现串包的情况。(具体线程安全的实现机制,可参考笔者之前的博客:《Http请求连接池-HttpClient的AbstractConnPool源码分析》,地址:<a href="https://segmentfault.com/a/1190000012009507">https://segmentfault.com/a/11...</a>)</p>
<h2>源码探究</h2>
<p>直接进到<code>RestClientBuilder</code>类的<code>build()</code>方法中一探究竟,代码如下:</p>
<pre><code>public RestClient build() {
if (failureListener == null) {
failureListener = new RestClient.FailureListener();
}
CloseableHttpAsyncClient httpClient = AccessController.doPrivileged(new PrivilegedAction<CloseableHttpAsyncClient>() {
@Override
public CloseableHttpAsyncClient run() {
return createHttpClient();
}
});
RestClient restClient = new RestClient(httpClient, maxRetryTimeout, defaultHeaders, hosts, pathPrefix, failureListener);
httpClient.start();
return restClient;
}</code></pre>
<p><code>build</code>方法是实际创建<code>RestClient</code>的地方,设置了超时时间、host等参数。再点进<code>start</code>方法,他是抽象类<code>CloseableHttpAsyncClient</code>的一个抽象方法,具体实现在实现类<code>CloseableHttpAsyncClientBase</code>中,如下:</p>
<pre><code>@Override
public void start() {
if (this.status.compareAndSet(Status.INACTIVE, Status.ACTIVE)) {
if (this.reactorThread != null) {
this.reactorThread.start();
}
}
}</code></pre>
<p>其调用了成员变量<code>reactorThread</code>的<code>start</code>方法,而成员变量<code>reactorThread </code>是一个Thread类,它在构造方法中初始化,如下:</p>
<pre><code>public CloseableHttpAsyncClientBase(
final NHttpClientConnectionManager connmgr,
final ThreadFactory threadFactory,
final NHttpClientEventHandler handler) {
super();
this.connmgr = connmgr;
if (threadFactory != null && handler != null) {
this.reactorThread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
try {
final IOEventDispatch ioEventDispatch = new InternalIODispatch(handler);
connmgr.execute(ioEventDispatch);
} catch (final Exception ex) {
log.error("I/O reactor terminated abnormally", ex);
} finally {
status.set(Status.STOPPED);
}
}
});
} else {
this.reactorThread = null;
}
this.status = new AtomicReference<Status>(Status.INACTIVE);
} </code></pre>
<p>可见,每创建一个<code>CloseableHttpAsyncClient</code>对象,就会创建一个<code>reactorThread</code>线程,而<code>connmgr.execute(ioEventDispatch)</code>是一个永久for循环执行的方法,所以线程的run方法不会主动退出,即,<code>reactorThread</code>线程不会销毁。</p>
<h2>问题解决</h2>
<p>找到bug所在后,结局方案很简单,只需将<code>ES</code>的<code>RestClient</code>通过单例的方式注入到服务中,即可解决堆外内存泄漏的问题。</p>
<h2>总结</h2>
<p>查找bug时,需要从多个方面去定位,通过尽可能多的现场信息去定量分析,监控、线上机器、源码的查看都需要,深入下去,会有不少收获。</p>
<h2>原文链接</h2>
<p><a href="https://segmentfault.com/a/1190000016574457">https://segmentfault.com/a/11...</a></p>
从网络IO到Thrift网络模型
https://segmentfault.com/a/1190000016250234
2018-09-03T15:03:28+08:00
2018-09-03T15:03:28+08:00
扑火的蛾
https://segmentfault.com/u/butterflyofzhang
3
<h2>I/O多路复用</h2>
<p><code>IO多路复用</code>就是通过一种机制,一个进程可以监听多个文件描述符,一个某个描述符就绪(一般是读就绪或写就绪),就能够通知程序进行相应的读写操作。select、poll、epoll本质上都是同步IO,因为他们需要在读写事件就绪后自己负责读写,即这个读写过程是阻塞的,而异步IO则无需自己负责读写,异步IO的实现会把数据从内核拷贝到用户空间。</p>
<h2>select</h2>
<h4>基本原理</h4>
<p>select 函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds。调用后select函数会阻塞,直到有描述符就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以通过遍历fdset,来找到就绪的描述符。</p>
<h4>select的缺点</h4>
<ul>
<li>单进程所能打开的文件描述符有一定限制,32位机默认1028,64位机默认2048。</li>
<li>对socket进程扫描时是线性扫描,效率很低。</li>
<li>用来存放文件描述符的数据结构,在用户空间和内核空间的复制开销极大。</li>
</ul>
<h2>poll</h2>
<p>poll与select类似,略过。</p>
<h2>epoll</h2>
<p>epoll是在linux 2.6内核中提出的,是select和poll的增强版本。</p>
<h4>基本原理</h4>
<p>epoll支持水平触发和边缘触发,最大的特点在于边缘触发,它只告诉进程哪些fd刚刚变为就绪态,并且只会通知一次。还有一个特点是,epoll使用“事件”的就绪通知方式,一旦该fd就绪,内核就会采用类似callback的回调机制来激活该fd。</p>
<h4>epoll的优点</h4>
<ul>
<li>没有最大连接数的限制,1G内存约能监听10W个端口。</li>
<li>不采用轮询的方式,不会随着FD数目的增加效率下降。只有活跃可用的FD才会回调。</li>
<li>内存拷贝,epoll使用mmap减少复制开销。(注:mmap本质就是绕过从网卡、磁盘拷贝数据到内核再拷贝到用户空间的方式,直接从网卡拷贝数据到用户空间,性能爆炸。)</li>
</ul>
<h2>Thrift网络服务模型</h2>
<p>thrift提供的网络服务模型有阻塞服务模型、非阻塞服务模型:</p>
<ul>
<li>阻塞服务模型:TSimpleServer、TThreadPoolServer</li>
<li>非阻塞服务模型:TNonblockingServer、THsHaServer和TThreadedSelectorServer</li>
</ul>
<h3>TSimpleServer</h3>
<p>该模式采用最简单的阻塞IO,一次只能接收并处理一个socket,处理流程如下:<br><img src="/img/remote/1460000016250237?w=690&h=920" alt="alt text" title="alt text"><br>此种模式效率低下,生产不会使用,略过。</p>
<h3>TThreadPoolServer</h3>
<p>TThreadPoolServer模式采用阻塞socket方式工作,主线程负责<code>阻塞式</code>(划重点,不是select的方式)监听是否有新socket到来,具体的业务处理交由一个线程池来处理。</p>
<p>accept部分的代码如下:</p>
<pre><code> protected TSocket acceptImpl() throws TTransportException {
if (serverSocket_ == null) {
throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");
}
try {
// 阻塞式监听新的连接
Socket result = serverSocket_.accept();
TSocket result2 = new TSocket(result);
result2.setTimeout(clientTimeout_);
return result2;
} catch (IOException iox) {
throw new TTransportException(iox);
}
}</code></pre>
<p>具体模型如下:<br><img src="/img/remote/1460000016250238?w=690&h=920" alt="alt text" title="alt text"></p>
<p>TThreadPoolServer本质是<code>One Thread Per Connection</code>模型。模型受限于线程池的最大线程数,在连接数很大话,请求只能排队,对于高并发的场景,此模型并不合适。</p>
<h3>TNonblockingServer</h3>
<p>TNonblockingServer模式也是单线程工作,但是采用NIO的模式,借助Channel/Selector机制, 采用IO事件模型来处理。本质是一种<code>event-loop</code>模型。</p>
<p>具体模型如下:<br><img src="/img/remote/1460000016250239?w=690&h=920" alt="alt text" title="alt text"></p>
<p>event-loop的核心代码如下:</p>
<pre><code> private void select() {
try {
// 等待事件,jdk7之前的版本存在问题,会存在会将CPU打满的情况,没有事件,select却返回,从而将CPU打满;Netty中通过threshold,解决了该问题
selector.select();
// 获取IO事件
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
// skip if not valid
if (!key.isValid()) {
cleanupSelectionKey(key);
continue;
}
// if the key is marked Accept, then it has to be the server
// transport.
// 处理连接事件
if (key.isAcceptable()) {
handleAccept();
} else if (key.isReadable()) {
// deal with reads
// 处理读事件
handleRead(key);
} else if (key.isWritable()) {
// deal with writes
// 处理写事件
handleWrite(key);
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
}</code></pre>
<p>这个模型一般由一个event dispatcher等待各类事件,待事件发生后原地调用对应的event handler,全部调用完后等待更多事件,故为"loop"。这个模型的实质是把多段逻辑按事件触发顺序交织在一个系统线程中。一个event-loop只能使用一个核,故此类程序要么是IO-bound,要么是每个handler有确定的较短的运行时间(比如http server),否则一个耗时漫长的回调就会卡住整个程序,产生高延时。在实践中这类程序不适合多开发者参与,一个人写了阻塞代码可能就会拖慢其他代码的响应。由于event handler不会同时运行,不太会产生复杂的race condition,一些代码不需要锁。此类程序主要靠部署更多进程增加扩展性。</p>
<h3>THsHaServer</h3>
<p>THsHaServer继承于TNonblockingServer,引入了线程池提高了任务处理的并发能力。THsHaServer是半同步半异步(Half-Sync/Half-Async)的处理模式,Half-Aysnc用于IO事件处理(Accept/Read/Write),Half-Sync用于业务handler对rpc的同步处理上。</p>
<p>具体模型如下:</p>
<p><img src="/img/remote/1460000016250240?w=690&h=920" alt="alt text" title="alt text"></p>
<p>THsHaServer与TNonblockingServer模式相比,THsHaServer在完成数据读取之后,将业务处理过程交由一个线程池来完成,主线程直接返回进行下一次循环操作,效率大大提升。</p>
<p>但是,主线程仍然需要处理accpet、read、write时间,当并发量非常大,读取或者发送的数据量比较大时,会将主线程阻塞住,新的连接无法被及时处理。</p>
<h3>TThreadedSelectorServer</h3>
<p>TThreadedSelectorServer是对THsHaServer的一种改进,它将selector中的read/write事件从主线程中剥离出来。</p>
<p>TThreadedSelectorServer是thrift提供的最高效的网络模型。具体模型如下:<br><img src="/img/remote/1460000016250241?w=690&h=920" alt="alt text" title="alt text"></p>
<p>构成如下:</p>
<ul>
<li>一个Accpet线程,专门用来处理新的socket</li>
<li>n个SelectorThread,用来处理read/write事件,读取、返回数据都是有这些线程完成的,每个SelectorThread会与若干个socket绑定,每个SelectorThread会处理与它绑定socket的read/write事件。</li>
<li>一个负载均衡器SelectorThreadLoadBalancer对象,在accpet线程接收到新的socket以后,由SelectorThreadLoadBalancer决定将socket与哪个SelectorThread绑定(其实就是一个next函数,每分配一个socket,就调用next)。</li>
<li>一个ExecutorService类型的worker线程池,在SelectorThread读取数据之后,将其包装成一个task,分配给worker线程池,处理业务逻辑。</li>
</ul>
<p>总结:TThreadedSelectorServer模式,其实就是标准的Reactor模式,Tomcat7以后的版本、Cobar、MyCat(分库分表proxy)基本都是这个套路,具体实现略有差异。</p>
<h2>原文链接</h2>
<p><a href="https://segmentfault.com/a/1190000016250234">https://segmentfault.com/a/11...</a></p>
对Java多线程的一些理解
https://segmentfault.com/a/1190000015344164
2018-06-21T13:15:24+08:00
2018-06-21T13:15:24+08:00
扑火的蛾
https://segmentfault.com/u/butterflyofzhang
4
<h2>OS中的进程、线程</h2>
<ul>
<li>进程:即处于执行期的程序,且包含其他资源,如打开的文件、挂起的信号、内核内部数据、处理器状态、内核地址空间、一个或多个执行的线程、数据段。</li>
<li>线程:进程中的活动对象,内核调度的对象不是进程而是线程;传统Unix系统一个进程只包含一个线程。</li>
</ul>
<h2>线程在Linux中的实现</h2>
<p>从Linux内核的角度来说,并没有线程这个概念。Linux把所有的线程都当做进程来实现,内核没有为线程准备特别的调度算法和特别的数据结构。线程仅仅被视为一个与其他进程共享某些资源的进程。所以,在内核看来,它就是一个普通的进程。</p>
<p>在Windows或Solaris等操作系统的实现中,它们都提供了专门支持线程的机制(<code>lightweight processes</code>)。</p>
<h3>写时拷贝</h3>
<p>传统的fork()系统调用直接把所有资源复制给新创建的进程,效率十分低下,因为拷贝的数据也许并不需要。</p>
<p>Linux的fork()使用写时拷贝实现。内核此时并不复制整个进程地址空间,而是让父进程和子进程共享一个拷贝。</p>
<p>只有在需要写入的时候,数据才会被复制,在此之前,只是以只读方式共享。这种优化可以避免拷贝大量根本就不会被使用的数据(地址空间常常包含几十M的数据)。</p>
<p>因此,Linux创建进程和线程的区别就是共享的地址空间、文件系统资源、文件描述符、信号处理程序等这些不同。</p>
<p>以下是StackOverflow上的一个答案:</p>
<p><img src="/img/remote/1460000015344167?w=690&h=483" alt="alt text" title="alt text"></p>
<p>即,在<code>Linux</code>下,进程使用<code>fork()</code>创建,线程使用<code>pthread_create()</code>创建;<code>fork()</code>和<code>pthread_create()</code>都是通过<code>clone()</code>函数实现,只是传递的参数不同,即共享的资源不同。(<code>Linux</code>是通过<code>NPTL</code>实现<code>POSIX Thread</code>规范,即通过轻量级进程实现<code>POSIX Thread</code>,使之前在<code>Unix</code>上的库、软件可以平稳的迁移到<code>Linux</code>上)</p>
<h2>Java线程如何映射到OS线程</h2>
<p>JVM在linux平台上创建线程,需要使用pthread 接口。pthread是POSIX标准的一部分它定义了创建和管理线程的C语言接口。Linux提供了pthread的实现:</p>
<pre><code>pthread_t tid;
if (pthread_create(&tid, &attr, thread_entry_point, arg_to_entrypoint))
{
fprintf(stderr, "Error creating thread\n");
return;
}</code></pre>
<ul>
<li>
<code>tid</code>是新创建线程的ID</li>
<li>
<code>attr</code>是我们需要设置的线程属性</li>
<li>
<code>thread_entry_point</code>是会被新创建线程调用的函数指针</li>
<li>
<code>arg_to_entrypoint</code>是会被传递给<code>thread_entry_point</code>的参数</li>
</ul>
<p><code>thread_entry_point</code>所指向的函数就是Thread对象的run方法。</p>
<h2>无返回值线程和带返回值的线程</h2>
<ul>
<li>无返回值:一种是直接继承Thread,另一种是实现Runnable接口</li>
<li>带返回值:通过Callable和Future实现</li>
</ul>
<p>带返回值的线程是我们在实践中更常用的。</p>
<h2>竞态条件</h2>
<p>当某个计算的正确性取决于多个线程的交替执行时序时,那么就会发生竞态条件。</p>
<p>最常见的竞态条件类型就是“先检查后执行”(<code>Check-Then-Act</code>)操作,即通过一个可能失效的观测结果来决定下一步的动作。</p>
<p>使用“”先检查后执行“的一种常见情况就是延迟初始化:</p>
<pre><code>public class LazyInitRace {
private ExpensiveObject instance = null;
public ExpensiveObject getInstance() {
if (instance == null) {
instance = new ExpensiveObject();
}
return instance;
}
}</code></pre>
<p>不要这么做。</p>
<h2>Executor框架</h2>
<h3>使用裸线程的缺点</h3>
<p>在<code>prod</code>环境中,为<code>每个任务分配一个线程</code>的方法存在严重的缺陷,尤其是当需要创建大量的线程时:</p>
<ul>
<li>线程生命周期的开销非常高:线程的创建与销毁并不是没有代价的。</li>
<li>资源消耗:会消耗内存和CPU,大量的线程竞争CPU资源将产生性能开销。如果你已经拥有足够多的线程使所有CPU处于忙碌状态,那么创建更多的线程反而会降低性能。</li>
<li>稳定性:可创建的线程的数量上存在限制,包括<code>JVM</code>的启动参数、操作系统对线程的限制,如果超出这些限制,很可能会抛出<code>OutOfMemoryError</code>异常。</li>
</ul>
<h3>Executor基本原理</h3>
<p><code>Executor</code>基于生产者-消费者模式,提交任务的操作相当于生产者,执行任务的线程则相当于消费者。</p>
<p>线程池的构造函数如下:</p>
<pre><code> public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}</code></pre>
<h4>线程池大小</h4>
<ul>
<li>
<code>corePoolSize</code>:核心线程数,当线程池的线程数小于<code>corePoolSize</code>,直接创建新的线程</li>
<li>线程数大于<code>corePoolSize</code>但是小于<code>maximumPoolSize</code>:如果任务队列还未满, 则会将此任务插入到任务队列末尾;如果此时任务队列已满, 则会创建新的线程来执行此任务。</li>
<li>线程数等于<code>maximumPoolSize</code>:如果任务队列还未满, 则会将此任务插入到任务队列末尾;如果此时任务队列已满, 则会由<code>RejectedExecutionHandler</code>处理。</li>
</ul>
<h4>keep-alive</h4>
<ul><li>
<code>keepAliveTime</code>:当我们的线程池中的线程数大于<code>corePoolSize</code>时, 如果此时有线程处于空闲(Idle)状态超过指定的时间(<code>keepAliveTime</code>), 那么线程池会将此线程销毁。</li></ul>
<h4>工作队列</h4>
<p>工作队列(<code>WorkQueue</code>)是一个<code>BlockingQueue</code>, 它是用于存放那些已经提交的, 但是还没有空余线程来执行的任务。</p>
<p>常见的工作队列有一下几种:</p>
<ul>
<li>直接切换(<code>Direct handoffs</code>)</li>
<li>无界队列(<code>Unbounded queues</code>)</li>
<li>有界队列(<code>Bounded queues</code>)</li>
</ul>
<p>在生产环境中,禁止使用无界队列,因为当队列中堆积的任务太多时,会消耗大量内存,最后<code>OOM</code>;通常都是设定固定大小的有界队列,当线程池已满,队列也满的情况下,直接将新提交的任务拒绝,抛<code>RejectedExecutionException </code>出来,本质上这是对服务自身的一种保护机制,当服务已经没有资源来处理新提交的任务,因直接将其拒绝。</p>
<h2>Java原生线程池在生产环境中的问题</h2>
<p>在服务化的背景下,我们的框架一般都会集成<code>全链路追踪</code>的功能,用来串联整个调用链,主要是记录<code>TraceId</code>和<code>SpanId</code>;<code>TraceId</code>和<code>SpanId</code>一般都记录在<code>ThreadLocal</code>中,对业务方来说是透明的。</p>
<p>当在同一个线程中同步RPC调用的时候,不会存在问题;但如果我们使用线程池做客户端异步调用时,就会导致<code>Trace</code>信息的丢失,根本原因是<code>Trace</code>信息无法从主线程的<code>ThreadLocal</code>传递到线程池的<code>ThreadLocal</code>中。</p>
<p>对于这个痛点,阿里开源的<code>transmittable-thread-local</code>解决了这个问题,实现其实不难,可以阅读一下源码:</p>
<pre><code>https://github.com/alibaba/transmittable-thread-local</code></pre>
<h2>性能与伸缩性</h2>
<h3>对性能的思考</h3>
<p>提升性能意味着用更少的资源做更多的事情。“资源”的含义很广,例如<code>CPU</code>时钟周期、内存、网络带宽、磁盘空间等其他资源。当操作性能由于某种特定的资源而受到限制时,我们通常将该操作称为资源密集型的操作,例如,CPU密集型、IO密集型等。</p>
<p>使用多线程理论上可以提升服务的整体性能,但与单线程相比,使用多线程会引入额外的性能开销。包括:线程之间的协调(例如加锁、触发信号以及内存同步),增加的上下文切换,线程的创建和销毁,以及线程的调度等。如果过度地使用线程,其性能可能甚至比实现相同功能的串行程序更差。</p>
<p>从性能监视的角度来看,CPU需要尽可能保持忙碌状态。如果程序是计算密集型的,那么可以通过增加处理器来提升性能。但如果程序无法使CPU保持忙碌状态,那增加更多的处理器也是无济于事的。</p>
<h3>可伸缩性</h3>
<p>可伸缩性是指:当增加计算资源时(例如CPU、内存、存储容量、IO带宽),程序的吞吐量或者处理能力能响应的增加。</p>
<p>我们熟悉的三层模型,即程序中的表现层、业务逻辑层和持久层是彼此独立,并且可能由不同的服务来处理,这很好地说明了提高伸缩性通常会造成性能损失。如果把表现层、业务逻辑层和持久层都融合到某个单体应用中,在负载不高的时候,其性能肯定要高于将应用程序分为多层的性能。这种单体应用避免了在不同层次之间传递任务时存在的网络延迟,减少了很多开销。</p>
<p>然而、当单体应用达到自身处理能力的极限时,会遇到一个严重问题:提升它的处理能力非常困难,即无法水平扩展。</p>
<h3>Amdahl定律</h3>
<p>大多数并发程序都是由一系列的并行工作和串行工作组成的。<code>Amdahl</code>定律描述的是:在增加计算资源的情况下,程序在理论上能够实现最高加速比,这个值取决于程序中<code>可并行组件</code>与<code>串行组件</code>所占的比重。假定<code>F</code>是必须被串行执行的部分,那么根据<code>Amdahl</code>定律,在包含N个处理器的机器上,最高的加速比为:</p>
<p><img src="/img/remote/1460000015344168" alt="alt text" title="alt text"></p>
<p>当N趋近于无穷大时,最大的加速比趋近于<code>1/F</code>。因此,如果程序中有50%的计算需要串行执行,那么最高的加速比只能是2。</p>
<h3>上下文切换</h3>
<p>线程调度会导致上下文切换,而上下文切换是会产生开销的。若是<code>CPU密集型</code>程序产生大量的线程切换,将会降低系统的吞吐量。</p>
<p><code>UNIX</code>系统的<code>vmstat</code>命令能够报告上下文切换次数以及在内核中执行时间的所占比例等信息。如果内核占用率较高(超过10%),那么通常表示调度活动发生得很频繁,这很可能是由<code>I/O</code>或者锁竞争导致的阻塞引起的。</p>
<pre><code>>> vmstat
procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu-----
r b swpd free buff cache si so bi bo in cs us sy id wa st
1 0 0 3235932 238256 3202776 0 0 0 11 7 4 1 0 99 0 0
cs:每秒上下文切换次数
sy:内核系统进程执行时间百分比
us:用户进程执行时间百分比</code></pre>
<p>以上。</p>
<h2>原文链接</h2>
<p><a href="https://segmentfault.com/a/1190000015344164">https://segmentfault.com/a/11...</a></p>
CMS垃圾回收和线上Full GC排查
https://segmentfault.com/a/1190000015182001
2018-06-05T14:42:35+08:00
2018-06-05T14:42:35+08:00
扑火的蛾
https://segmentfault.com/u/butterflyofzhang
1
<h2>背景</h2>
<p>我们上线Java服务的时候需要对其配置一些JVM参数,如堆空间大小、虚拟机栈大小、垃圾回收算法。对于年轻代和老年代我们可以配置不同的垃圾回收算法。在一些对<code>rt</code>要求很高的场景,服务不能有长时间的卡顿,<code>CMS</code>就可以运用于此场景。</p>
<p><code>Concurrent Mark Sweep</code>,是一款基于并发、使用标记清除算法的垃圾回收算法,只针对老年代进行垃圾回收。CMS收集器工作时,GC工作线程和用户线程可以并发执行,以达到降低<code>STW</code>时间的目的。</p>
<p>开起VM选项<code>-XX:+UseConcMarkSweepGC</code>,表示对老年代的回收采用CMS。</p>
<h2>前置知识</h2>
<h3>STW</h3>
<p>首先,我们需要厘清一个概念,即只有<code>标记</code>阶段才需要<code>STW</code>。标记完成以后,需要清除的对象已经确定,无论此时是否产生新的垃圾,都不影响对这些对象的清理。也就是说,<code>清除</code>阶段是可以设计成和用户线程并发执行的。</p>
<p>JVM在暂停的时候,需要选准一个时机,由于JVM系统运行期间的复杂性,不可能做到随时暂停,因此引入了<code>安全点(safepoint)</code>的概念:程序只有在运行到安全点的时候,才可以暂停下来。<code>HotSpot</code>采用主动中断的方式,让执行线程在运行期轮询是否需要暂停的标志,若需要则中断挂起。<code>HotSpot</code>使用了几条短小精炼的汇编指令便可完成安全点轮询以及触发线程中断,因此对系统性能的影响几乎可以忽略不计。</p>
<h3>可达性</h3>
<p><code>可达性</code>是指,如果一个对象会被至少一个程序中的可达对象通过直接或间接的方式引用,则称该对象是<code>可达的</code>。更详细地说,一个对象满足一下两个条件之一,即被判定为可达的。</p>
<p>1.本身是根对象。根(root)是指由堆以外空间访问的对象。JVM会将以下对象标记为根:a.虚拟机栈(栈帧中的本地变量表)中引用的对象;b.方法区中的类静态属性引用的对象;c.方法区中的常量引用的对象;d.本地方法栈中JNI的引用对象。</p>
<p>2.被一个可达的对象引用。</p>
<h2>CMS的几个阶段</h2>
<p><code>CMS</code>将可达性分析分解成两个阶段:a.仅扫描与根节点直接关联的对象; b.继续向下扫描完所有对象。因此,<code>标记</code>阶段也被拆分成两个阶段,即<code>初始标记</code>和<code>并发标记</code>。</p>
<p>CMS完整的收集过程如下:</p>
<ol>
<li>
<code>初始标记(init-mark)</code>:仅扫描与根节点直接关联的对象并标记,这个阶段必须<code>STW</code>, 由于跟节点数量有限,所以这个过程非常短暂。</li>
<li>
<code>并发标记(concurrent-marking)</code>:与用户线程并发标记。这个阶段在初始标记的基础上继续向下追溯标记。在并发标记阶段,用户线程和标记线程并发执行,所以用户不会感受到停顿。</li>
<li>
<code>并发预清理(concurrent-precleaning)</code>:与用户线程并发进行。在并发标记阶段一些对象的引用已经发生了变化,<code>precleaning</code>会发现这些引用关系的改变,并将存活的对象标记。举个例子:如果线程A有一个指向对象X的引用,并将该引用传递给了线程B,CMS需要记录下线程B持有了对象X,即使线程A已经不存在了。<code>precleaning</code>是为了减少下一阶段“重新标记”的工作量,因为<code>remark</code>阶段会<code>STW</code>。</li>
<li>
<code>重新标记(remark)</code>:<code> remark</code>阶段会<code>STW</code>。如果应用正在并发运行且在不断地改变对象引用,<code>CMS</code>则不能准确地确定某个对象是否存活。所以<code>CMS</code>会在<code>remark</code>阶段<code>STW</code>,从而获取所有引用关系的改变。</li>
<li>
<code>并发清理(concurrent-sweeping)</code>:清理垃圾对象,这个阶段GC线程和用户线程并发执行。</li>
<li>
<code>并发重置(concurrent-reset)</code>:重置CMS收集器的数据结构,做好下一次执行GC任务的准备工作。</li>
</ol>
<p><img src="/img/remote/1460000015182004" alt="alt text" title="alt text"></p>
<h2>线上Full GC分析</h2>
<p>线上某服务的老年代配置了CMS,但却在gc.log发现连续Full GC的问题。JVM参数配置如下:</p>
<pre><code>-XX:+UseCMSInitiatingOccupancyOnly
-XX:CMSInitiatingOccupancyFraction=68</code></pre>
<p>参数的意义是:在老年代到<code>68%</code>的时候,会触发一次CMS GC,应该是出现类似如下的日志:</p>
<pre><code>T20:10:37.803+0800: 3246087.559: [CMS-concurrent-mark-start]
T20:10:38.463+0800: 3246088.220: [CMS-concurrent-mark: 0.661/0.661 secs] [Times: user=3.17 sys=0.56, real=0.66 secs]
T20:10:38.463+0800: 3246088.220: [CMS-concurrent-preclean-start]
T20:10:38.552+0800: 3246088.309: [CMS-concurrent-preclean: 0.069/0.089 secs] [Times: user=0.14 sys=0.04, real=0.09 secs]_</span>
T20:10:38.552+0800: 3246088.309: [CMS-concurrent-abortable-preclean-start]</code></pre>
<p>但线上环境的日志却出现如下的情况:</p>
<p><img src="/img/remote/1460000015182005" alt="alt text" title="alt text"></p>
<p>老年代配置了900M,但却在只使用了50+M的时候触发了Full GC,而且是在短暂的时间内连续触发。</p>
<p>配置了CMS却触发Full GC,有以下几种可能:</p>
<ol>
<li>大对象分配时,年轻代不够,直接晋升到老年代,老年代空间也不够,触发 Full GC(老年代还剩800+M,显然不可能)</li>
<li>内存碎片导致(由于CMS是基于标记清除算法的,所以会导致内存碎片,但通过grep -i "cms" gc.log,JVM尚未触发过CMS回收,所以也不存在内存碎片的说法)</li>
<li>CMS GC失败导致(从gc.log并未找到concurrent mode failure的记录,排除)</li>
<li>jmap -histo(人为执行该命令)</li>
</ol>
<p>经笔者回忆,在中午快12点的时候确实登录过线上机,执行过<code>jmap -histo:live</code>命令,经验证,手动执行<code>jmap -histo:live</code>,也确实会在gc.log出现触发 Full GC的现象,问题得到验证。</p>
<h2>原文链接</h2>
<p><a href="https://segmentfault.com/a/1190000015182001">https://segmentfault.com/a/11...</a></p>
Spring BeanUtils源码分析
https://segmentfault.com/a/1190000014833730
2018-05-12T16:11:23+08:00
2018-05-12T16:11:23+08:00
扑火的蛾
https://segmentfault.com/u/butterflyofzhang
11
<h2>背景</h2>
<p>在我们着手一个Java Web项目的时候,经常会遇到<code>DO、VO、DTO</code>对象之间的属性拷贝,若采用<code>get、set</code>的方法来进行赋值的话,代码会相当冗长丑陋,一般我们会采用<code>Spring</code>的<code>BeanUtils</code>类来进行属性拷贝,其基本原理就是通过Java的反射机制,下面我们来看一下源码的具体实现。</p>
<h2>前置知识</h2>
<p>在分析源码前,我们先温习一下以下的知识点。</p>
<h3>java.lang.Class类</h3>
<p>在Java中万物皆对象,而且我们在代码中写的每一个类也都是对象,是<code>java.lang.Class</code>类的对象。所以,每个类都有自己的实例对象,而且它们自己也都是<code>Class</code>类的对象。</p>
<p>我们来看一下<code>Class</code>类的构造方法:</p>
<pre><code>private Class(ClassLoader loader) {
// Initialize final field for classLoader. The initialization value of non-null
// prevents future JIT optimizations from assuming this final field is null.
classLoader = loader;
}</code></pre>
<p>Class类的构造方法是私有的,只有JVM可以创建该类的对象,因此我们无法在代码中通过<code>new</code>的方式显示声明一个Class对象。</p>
<p>但是,我们依然有其他方式获得Class类的对象:</p>
<p>1.通过类的静态成员变量</p>
<pre><code>Class clazz = Test.class;</code></pre>
<p>2.通过对象的getClass()方法</p>
<pre><code>Class clazz = test.getClass();</code></pre>
<p>3.通过Class的静态方法forName()</p>
<pre><code>// forName需要传入类的全路径
Class clazz = Class.forName("destiny.iron.api.model.Test"); </code></pre>
<h3>基本类型和包装类型</h3>
<p>基本类型和其对应的包装类的Class对象是不相等的,即<code>long.class != Long.class </code>。</p>
<h3>PropertyDescriptor类</h3>
<p><code>PropertyDescriptor</code>类表示的是标准形式的Java Bean通过存取器(即get set方法)导出的一个属性,比如,我们可以通过以下方式,对对象的属性进行赋值:</p>
<pre><code>public class Person {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Person{" +
"name='" + name + '\'' +
'}';
}
public static void main(String[] args) throws Exception {
Person test1 = new Person();
test1.setName("vvvv");
PropertyDescriptor pd = new PropertyDescriptor("name", test1.getClass());
Method setMethod = pd.getWriteMethod(); // 还有与Wirte方法对应的Read方法
setMethod.invoke(test1, "bbbbb");
System.out.print(test1);
}
}</code></pre>
<h3>引用类型</h3>
<p>Java中有<code>strong、soft、weak、phantom</code>四种引用类型,下面介绍一下soft引用和weak引用:</p>
<ul>
<li>
<code>Soft Reference</code>: 当对象是<code>Soft reference</code>可达时,向系统申请更多内存,GC不是直接回收它,而是当内存不足的时候才回收它。因此Soft reference适合用于构建一些缓存系统。</li>
<li>
<code>Weak Reference</code>: 弱引用的强度比软引用更弱一些,被弱引用关联的对象只能生存到下一次GC发生之前。当垃圾收集器工作时,无论当前内存是否足够,都会回收掉只被弱引用关联的对象。</li>
</ul>
<h2>源码分析</h2>
<pre><code>private static void copyProperties(Object source, Object target, Class<?> editable, String... ignoreProperties)
throws BeansException {
// 检查source和target对象是否为null,否则抛运行时异常
Assert.notNull(source, "Source must not be null");
Assert.notNull(target, "Target must not be null");
// 获取target对象的类信息
Class<?> actualEditable = target.getClass();
// 若editable不为null,检查target对象是否是editable类的实例,若不是则抛出运行时异常
// 这里的editable类是为了做属性拷贝时限制用的
// 若actualEditable和editable相同,则拷贝actualEditable的所有属性
// 若actualEditable是editable的子类,则只拷贝editable类中的属性
if (editable != null) {
if (!editable.isInstance(target)) {
throw new IllegalArgumentException("Target class [" + target.getClass().getName() +
"] not assignable to Editable class [" + editable.getName() + "]");
}
actualEditable = editable;
}
// 获取目标类的所有PropertyDescriptor,getPropertyDescriptors这个方法请看下方
PropertyDescriptor[] targetPds = getPropertyDescriptors(actualEditable);
List<String> ignoreList = (ignoreProperties != null ? Arrays.asList(ignoreProperties) : null);
for (PropertyDescriptor targetPd : targetPds) {
// 获取该属性对应的set方法
Method writeMethod = targetPd.getWriteMethod();
// 属性的set方法存在 且 该属性不包含在忽略属性列表中
if (writeMethod != null && (ignoreList == null || !ignoreList.contains(targetPd.getName()))) {
// 获取source类相同名字的PropertyDescriptor, getPropertyDescriptor的具体实现看下方
PropertyDescriptor sourcePd = getPropertyDescriptor(source.getClass(), targetPd.getName());
if (sourcePd != null) {
// 获取对应的get方法
Method readMethod = sourcePd.getReadMethod();
// set方法存在 且 target的set方法的入参是source的get方法返回值的父类或父接口或者类型相同
// 具体ClassUtils.isAssignable()的实现方式请看下面详解
if (readMethod != null &&
ClassUtils.isAssignable(writeMethod.getParameterTypes()[0], readMethod.getReturnType())) {
try {
//get方法是否是public的
if (!Modifier.isPublic(readMethod.getDeclaringClass().getModifiers())) {
//暴力反射,取消权限控制检查
readMethod.setAccessible(true);
}
//获取get方法的返回值
Object value = readMethod.invoke(source);
// 原理同上
if (!Modifier.isPublic(writeMethod.getDeclaringClass().getModifiers())) {
writeMethod.setAccessible(true);
}
// 将get方法的返回值 赋值给set方法作为入参
writeMethod.invoke(target, value);
}
catch (Throwable ex) {
throw new FatalBeanException(
"Could not copy property '" + targetPd.getName() + "' from source to target", ex);
}
}
}
}
}
}</code></pre>
<p><code>getPropertyDescriptors</code>源码:</p>
<pre><code> public static PropertyDescriptor[] getPropertyDescriptors(Class<?> clazz) throws BeansException {
// CachedIntrospectionResults类是对PropertyDescriptor的一个封装实现,看forClass方法的实现
CachedIntrospectionResults cr = CachedIntrospectionResults.forClass(clazz);
return cr.getPropertyDescriptors();
}
@SuppressWarnings("unchecked")
static CachedIntrospectionResults forClass(Class<?> beanClass) throws BeansException {
// strongClassCache的声明如下:
// strongClassCache = new ConcurrentHashMap<Class<?>, CachedIntrospectionResults>(64);
// 即将Class作为key,CachedIntrospectionResults作为value的map,
// 由于线程安全的需要,使用ConcurrentHashMap作为实现
CachedIntrospectionResults results = strongClassCache.get(beanClass);
if (results != null) {
return results;
}
// 若strongClassCache中不存在,则去softClassCache去获取,softClassCache的声明如下
// softClassCache = new ConcurrentReferenceHashMap<Class<?>, CachedIntrospectionResults>(64);
// ConcurrentReferenceHashMap是Spring实现的可以指定entry引用级别的ConcurrentHashMap,默认的引用级别是soft,可以防止OOM
results = softClassCache.get(beanClass);
if (results != null) {
return results;
}
results = new CachedIntrospectionResults(beanClass);
ConcurrentMap<Class<?>, CachedIntrospectionResults> classCacheToUse;
// isCacheSafe方法检查给定的beanClass是否由入参中的classloader或者此classloader的祖先加载的(双亲委派的原理)
// isClassLoaderAccepted检查加载beanClass的classloader是否在可以接受的classloader的集合中 或者是集合中classloader的祖先
if (ClassUtils.isCacheSafe(beanClass, CachedIntrospectionResults.class.getClassLoader()) ||
isClassLoaderAccepted(beanClass.getClassLoader())) {
classCacheToUse = strongClassCache;
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Not strongly caching class [" + beanClass.getName() + "] because it is not cache-safe");
}
classCacheToUse = softClassCache;
}
// 根据classloader的结果,将类信息加载到对应的缓存中
CachedIntrospectionResults existing = classCacheToUse.putIfAbsent(beanClass, results);
return (existing != null ? existing : results);
}</code></pre>
<p><code>isAssignable</code>源码:</p>
<pre><code> public static boolean isAssignable(Class<?> lhsType, Class<?> rhsType) {
Assert.notNull(lhsType, "Left-hand side type must not be null");
Assert.notNull(rhsType, "Right-hand side type must not be null");
// 若左边类型 是右边类型的父类、父接口,或者左边类型等于右边类型
if (lhsType.isAssignableFrom(rhsType)) {
return true;
}
// 左边入参是否是基本类型
if (lhsType.isPrimitive()) {
//primitiveWrapperTypeMap是从包装类型到基本类型的map,将右边入参转化为基本类型
Class<?> resolvedPrimitive = primitiveWrapperTypeMap.get(rhsType);
if (lhsType == resolvedPrimitive) {
return true;
}
}
else {
// 将右边入参转化为包装类型
Class<?> resolvedWrapper = primitiveTypeToWrapperMap.get(rhsType);
if (resolvedWrapper != null && lhsType.isAssignableFrom(resolvedWrapper)) {
return true;
}
}
return false;
}</code></pre>
<p><code>ClassUtils.isAssignable()</code>方法扩展了<code>Class的isAssignableFrom()</code>方法,即将<code>Java</code>的基本类型和包装类型做了兼容。</p>
<h2>总结</h2>
<p>一个看似简单的<code>BeanUtils</code>工具类,其实里面包含的Java基础的知识点非常多,包括类型信息、反射、线程安全、引用类型、类加载器等。<code>Spring</code>的<code>BeanUtils</code>的实现里使用了<code>ConcurrentHashMap</code>作为缓存,每次去获取<code>PropertyDescriptor</code>时,可以直接去缓存里面获取,而不必每次都去调用<code>native</code>方法,所以<code>Spring</code>的<code>BeanUtils</code>的性能还是很不错的。</p>
<h2>原文链接</h2>
<p><a href="https://segmentfault.com/a/1190000014833730">https://segmentfault.com/a/11...</a></p>
浅析MySQL二段锁
https://segmentfault.com/a/1190000012513286
2017-12-20T20:26:45+08:00
2017-12-20T20:26:45+08:00
扑火的蛾
https://segmentfault.com/u/butterflyofzhang
10
<h2>背景</h2>
<p>虽然两阶段加锁(2PL)听起来和两阶段提交(two-phase commit, 2PC)很相似,但它们是完全不同的东西。</p>
<p>在介绍<code>MySQL二段锁</code>之前,我需要理清一下概念,即<code>MySQL</code>二阶段加锁与二阶段提交的区别:</p>
<pre><code>二阶段加锁:用于单机事务中的一致性和隔离性
二阶段提交:用于分布式事务</code></pre>
<h2>何为二段锁</h2>
<p>在一个事务操作中,分为<code>加锁阶段</code>和<code>解锁阶段</code>,且所有的加锁操作在解锁操作之前,具体如下图所示:</p>
<p><img src="/img/remote/1460000012513291" alt="alt text" title="alt text"></p>
<h2>加锁时机</h2>
<p>当对记录进行更新操作或者<code>select for update(X锁)、lock in share mode(S锁)</code>时,会对记录进行加锁,锁的种类很多,不在此赘述。</p>
<h2>何时解锁</h2>
<p>在一个事务中,只有在<code>commit</code>或者<code>rollback</code>时,才是解锁阶段。</p>
<h2>二阶段加锁最佳实践</h2>
<p>下面举个具体的例子,来讲述二段锁对应用性能的影响,我们举个库存扣减的例子:</p>
<pre><code>方案一:
start transaction;
// 锁定用户账户表
select * from t_accout where acount_id=234 for update
//生成订单
insert into t_trans;
// 减库存
update t_inventory set num=num-3 where id=${id} and num>=3;
commit;</code></pre>
<pre><code>方案二:
start transaction;
// 减库存
update t_inventory set num=num-3 where id=${id} and num>=3;
// 锁定用户账户表
select * from t_accout where acount_id=234 for update
//生成订单
insert into t_trans;
commit;</code></pre>
<p>我们的应用通过<code>JDBC</code>操作数据库时,底层本质上还是走<code>TCP</code>进行通信,<code>MySQL协议</code>是一种<code>停-等式协议</code>(和<code>http</code>协议类似,每发送完一个分组就停止发送,等待对方的确认,在收到确认后再发送下一个分组),既然通过网络进行通信,就必然会有延迟,两种方案的网络通信时序图如下:</p>
<p><img src="/img/remote/1460000012513292" alt="alt text" title="alt text"></p>
<p>由于商品库存往往是最致命的热点,是整个服务的热点。如果采用第一种方案的话,<code>TPS</code>理论上可以提升<code>3rt/rt=3</code>倍。而这是在一个事务中只有3条SQL的情况,理论上多一条SQL就多一个rt时间。</p>
<p>另外,当更新操作到达数据库的那个点,才算加锁成功。<code>commit</code>到达数据库的时候才算解锁成功。所以,更新操作的前半个<code>rt</code>和<code>commit</code>操作的后半个<code>rt</code>都不计算在整个锁库存的时间内。</p>
<h2>性能优化</h2>
<p>从上面的例子可以看出,在一个事务操作中,将对最热点记录的操作放到事务的最后面,这样可以显著地提高服务的<code>吞吐量</code>。</p>
<h2>select for update 和 update where的最优选择</h2>
<p>我们可以将一些简单的判断逻辑写到update操作的谓词里面,这样可以减少加锁的时间,如下:</p>
<pre><code>方案一:
start transaction
num = select count from t_inventory where id=234 for update
if count >= 3:
update t_inventory set num=num-3 where id=234
commit
else:
rollback</code></pre>
<p>方案二:</p>
<pre><code>start transaction:
int affectedRows = update t_inventory set num=num-3 where id=234 and num>=3
if affectedRows > 0:
commit
else:
rollback</code></pre>
<p>延时图如下:<br><img src="/img/remote/1460000012513293" alt="alt text" title="alt text"></p>
<p>从上图可以看出,加了update谓词以后,一个事务少了1rt的锁记录时间(update谓词和select for update对记录加的都是X锁,所以效果是一样的)。</p>
<h2>死锁</h2>
<p>加锁SQL都或多或少会遇到这个问题。上面的最佳实践中,笔者建议在一个事务中,对记录的加锁按照记录的热点程度升序排列,对与任何会并发的SQL都必须按照相同的顺序来处理,否则会导致死锁,如下图:</p>
<p><img src="/img/remote/1460000012513294" alt="alt text" title="alt text"></p>
<h2>总结</h2>
<p>合理地写好SQL,对于我们提高系统的吞吐量至关重要。</p>
<h2>原文链接</h2>
<p><a href="https://segmentfault.com/a/1190000012513286">https://segmentfault.com/a/11...</a></p>
本地消息表实现最终一致性
https://segmentfault.com/a/1190000012415698
2017-12-13T21:52:09+08:00
2017-12-13T21:52:09+08:00
扑火的蛾
https://segmentfault.com/u/butterflyofzhang
6
<h2>背景</h2>
<p>传统的单体应用不会横跨多个数据库,可以通过单机事务保证一致性。然而在海量数据的场景下,我需要对数据库做拆分,即分库分表,而<code>Cobar</code>、<code>MyCat</code>这类分库分表中间件并不提供分布式事务的特性,并且基于二阶段提交的分布式事务性能较差,对于大多数业务场景来说,并不需要强一致,只需要保证最终一致性即可。</p>
<h2>实践</h2>
<p>下面我们举个下订单的场景,总共有3个实体,<code>商品</code>、<code>用户</code>、<code>订单</code>,我们按照<code>user_id</code>来sharding。所以相同<code>user_id</code>的<code>用户</code>和<code>订单</code>在同一个物理库下,而<code>商品</code>表中不存在<code>user_id</code>,所以<code>商品</code>表在不同的物理库下。</p>
<p>下订单的场景,主要涉及到两个事务操作,<code>扣减库存</code>和<code>生成订单</code>,因为两个操作涉及不同的数据库,所以无法保证强一致性。</p>
<p>我们可以通过本地消息表,来实现最终一致性,具体流程如下图:</p>
<p><img src="/img/remote/1460000012415701" alt="alt text" title="alt text"></p>
<ul>
<li>调用外部服务,生成全局唯一的交易流水号<code>trans_id</code>。</li>
<li>事务一:1) 扣减库存 2) 根据流水单号,生成对应商品的冻结记录。消息表主要由<code>商品ID</code>、<code>交易流水号</code>、<code>冻结数</code>、<code>消息状态</code>这四个字段构成,因为消息表和商品表在同一个物理库下,所以<code>TX1</code>中的操作1和操作2是可以构成事务操作的。冻结记录的状态有三种:<code>已冻结</code>、<code>释放已售出</code>、<code>释放未售出</code>。冻结记录的初始状态为<code>已冻结</code>。</li>
<li>事务一如果成功,则进行事务二;如果事务一失败,则直接返回。</li>
<li>事务二:根据交易流水号<code>trans_id</code>生成订单,订单的状态有三种:<code>未支付</code>、<code>已支付</code>、<code>超时</code>,订单的初始状态为<code>未支付</code>。</li>
<li>若订单创建成功,则进行后续的支付流程。</li>
<li>如果事务二失败,由于网络抖动超时等原因,不一定是真的生成订单失败,即 在事务二失败的情况下,可能生成了订单,也可能确实没有生成订单。</li>
<li>定时任务一:设置一个每隔15分钟的定时任务(即一个订单必须在15分钟内完成支付),从订单表里捞出最近半小时内的所有订单,对每一个订单做如下处理:若订单超时<code>未支付</code>,开启事务<code>SELECT FOR UPDATE </code>锁住该订单,即用悲观锁阻止用户对订单进行支付等操作,然后通过订单的<code>trans_id</code>去冻结表更新对应冻结记录的状态,置为<code>释放未售出</code>,并回滚商品数量,回滚商品的操作完成后,将订单状态置为<code>超时</code>,若事务中调用的回滚商品数量的服务失败,则可以发出报警人工处理,或通过更长时间的定时任务去处理;若订单为<code>已支付</code>,则将冻结表中记录的状态置为<code>释放已售出</code>。</li>
<li>定时任务二:因为存在<code>事务一</code>成功,而<code>事务二</code>的订单确实没有创建成功的情况,这样会冻结一部分商品的数量,所以可以捞取出 创建超过10分钟 状态为<code>已冻结</code>的所有冻结记录,根据每个冻结记录的<code>trans_id</code>去订单表查询,若不存在对应的订单,则将冻结记录的状态更新为<code>释放未售出</code>,并回滚商品数量。</li>
<li>另一个需要注意的点,在定时任务一中,对于超时<code>未支付</code>的订单,会先回滚冻结表,然后将订单状态置为<code>超时</code>,但这最后一步将订单置为<code>超时</code>可能会失败,这样会出现不一致的状态,即订单状态为<code>未支付</code>,而冻结记录的状态为<code>释放未售出</code>。所以,在支付的时候需要做一个前置校验,检查冻结记录的状态是否为<code>已冻结</code>,若不是,则拒绝支付。</li>
</ul>
<h2>变种</h2>
<p>在上面这种模型的基础上,还有一种变种,如下图:</p>
<p><img src="/img/remote/1460000012415702" alt="alt text" title="alt text"></p>
<p>即在<code>TX2</code>失败的情况下,跳转到<code>TX3</code>。</p>
<ul>
<li>根据<code>trans_id</code>查询订单,若订单不存在,则直接将冻结记录置为<code>释放未售出</code>,并回滚库存;若订单存在,则说明<code>TX2</code>因为网络抖动等原因而失败,其实订单创建成功,则进行正常的支付流程。</li>
<li>需要注意的是:根据<code>trans_id</code>查询订单的时候,一定要<code>开启事务</code>,这样才会强制走主库,若不开启事务,则会走备库,因为<code>MySQL</code>主从同步延迟的问题,备库很可能无法查询到订单,从而回滚库存,这显然是错误的。</li>
</ul>
<h4>变种的优点</h4>
<p>将定时任务的压力均匀地分配到每一次调用中,提高数据库的可用性。</p>
<h2>总结</h2>
<p>在不需要强一致性的业务场景下,都可以通过<code>定时任务</code>+<code>幂等操作</code>来实现最终一致性。</p>
<p>以上。</p>
<h2>原文链接</h2>
<p><a href="https://segmentfault.com/a/1190000012415698">https://segmentfault.com/a/11...</a></p>
Http请求连接池-HttpClient的AbstractConnPool源码分析
https://segmentfault.com/a/1190000012009507
2017-11-15T00:31:59+08:00
2017-11-15T00:31:59+08:00
扑火的蛾
https://segmentfault.com/u/butterflyofzhang
0
<h2>背景</h2>
<p>在做服务化拆分的时候,若不是性能要求特别高的场景,我们一般对外暴露<code>Http</code>服务。<code>Spring</code>里提供了一个模板类<code>RestTemplate</code>,通过配置<code>RestTemplate</code>,我们可以快速地访问外部的<code>Http</code>服务。<code>Http</code>底层是通过<code>Tcp</code>的三次握手建立连接的,若每个请求都要重新建立连接,那开销是很大的,特别是对于消息体非常小的场景,开销更大。</p>
<p>若使用连接池的方式,来管理连接对象,能极大地提高服务的吞吐量。</p>
<p><code>RestTemplate</code>底层是封装了<code>HttpClient</code>(笔者的版本是4.3.6),它提供了连接池机制来处理高并发网络请求。</p>
<h2>示例</h2>
<p>通常,我们采用如下的样板代码来构建<code>HttpClient</code>:</p>
<pre><code> HttpClientBuilder builder = HttpClientBuilder.create();
builder.setMaxConnTotal(maxConnections).setMaxConnPerRoute(maxConnectionsPerRoute);
if (!connectionReuse) {
builder.setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE);
}
if (!automaticRetry) {
builder.disableAutomaticRetries();
}
if (!compress) {
builder.disableContentCompression();
}
HttpClient httpClient = builder.build();</code></pre>
<p>从上面的代码可以看出,<code>HttpClient</code>使用建造者设计模式来构造对象,最后一行代码构建对象,前面的代码是用来设置客户端的最大连接数、单路由最大连接数、是否使用长连接、压缩等特性。</p>
<h2>源码分析</h2>
<p>我们进入HttpClientBuilder的<code>build()</code>方法,会看到如下代码:</p>
<pre><code> # 构造Http连接池管理器
final PoolingHttpClientConnectionManager poolingmgr = new PoolingHttpClientConnectionManager(
RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.getSocketFactory())
.register("https", sslSocketFactory)
.build());
if (defaultSocketConfig != null) {
poolingmgr.setDefaultSocketConfig(defaultSocketConfig);
}
if (defaultConnectionConfig != null) {
poolingmgr.setDefaultConnectionConfig(defaultConnectionConfig);
}
if (systemProperties) {
String s = System.getProperty("http.keepAlive", "true");
if ("true".equalsIgnoreCase(s)) {
s = System.getProperty("http.maxConnections", "5");
final int max = Integer.parseInt(s);
poolingmgr.setDefaultMaxPerRoute(max);
poolingmgr.setMaxTotal(2 * max);
}
}
if (maxConnTotal > 0) {
poolingmgr.setMaxTotal(maxConnTotal);
}
if (maxConnPerRoute > 0) {
poolingmgr.setDefaultMaxPerRoute(maxConnPerRoute);
}
# Http连接管理器采用连接池的方式实现
connManager = poolingmgr;</code></pre>
<p>默认情况下构造出的<code>Http</code>连接管理器是采用连接池的方式实现的。</p>
<p>我们进入<code> PoolingHttpClientConnectionManager</code>的代码,其连接池的核心实现是依赖于<code> CPool</code>类,而<code> CPool</code>又继承了抽象类<code>AbstractConnPool</code>,<code> AbstractConnPool</code>有<code>@ThreadSafe</code>的注解,说明它是线程安全类,所以<code> HttpClient</code>线程安全地获取、释放连接都依赖于<code> AbstractConnPool</code>。</p>
<p>接下来我来看最核心的<code>AbstractConnPool</code>类,以下是连接池的结构图:</p>
<p><img src="/img/remote/1460000012009512?w=690&h=376" alt="alt text" title="alt text"></p>
<p>连接池最重要的两个公有方法是<code> lease</code>和<code>release</code>,即获取连接和释放连接的两个方法。</p>
<h4>lease 获取连接</h4>
<pre><code> @Override
public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
Args.notNull(route, "Route");
Asserts.check(!this.isShutDown, "Connection pool shut down");
return new PoolEntryFuture<E>(this.lock, callback) {
@Override
public E getPoolEntry(
final long timeout,
final TimeUnit tunit)
throws InterruptedException, TimeoutException, IOException {
final E entry = getPoolEntryBlocking(route, state, timeout, tunit, this);
onLease(entry);
return entry;
}
};
}</code></pre>
<p><code> lease</code>方法返回的是一个<code> Future</code>对象,即需要调用<code> Future</code>的<code>get</code>方法,才可以得到<code>PoolEntry</code>的对象,它包含了一个连接的具体信息。</p>
<p>而获取连接是通过<code> getPoolEntryBlocking</code>方法实现的,通过函数名可以知道,这是一个阻塞的方法,即该<code>route</code>所对应的连接池中的连接不够用时,该方法就会阻塞,直到该<code> route</code>所对应的连接池有连接释放,方法才会被唤醒;或者方法一直等待,直到连接超时抛出异常。</p>
<pre><code> private E getPoolEntryBlocking(
final T route, final Object state,
final long timeout, final TimeUnit tunit,
final PoolEntryFuture<E> future)
throws IOException, InterruptedException, TimeoutException {
Date deadline = null;
// 设置连接超时时间戳
if (timeout > 0) {
deadline = new Date
(System.currentTimeMillis() + tunit.toMillis(timeout));
}
// 获取连接,并修改修改连接池,所以加锁--->线程安全
this.lock.lock();
try {
// 从Map中获取该route对应的连接池,若Map中没有,则创建该route对应的连接池
final RouteSpecificPool<T, C, E> pool = getPool(route);
E entry = null;
while (entry == null) {
Asserts.check(!this.isShutDown, "Connection pool shut down");
for (;;) {
// 获取 同一状态的 空闲连接,即从available链表的头部中移除,添加到leased集合中
entry = pool.getFree(state);
// 若返回连接为空,跳出循环
if (entry == null) {
break;
}
// 若连接已过期,则关闭连接
if (entry.isExpired(System.currentTimeMillis())) {
entry.close();
} else if (this.validateAfterInactivity > 0) {
if (entry.getUpdated() + this.validateAfterInactivity <= System.currentTimeMillis()) {
if (!validate(entry)) {
entry.close();
}
}
}
if (entry.isClosed()) {
// 若该连接已关闭,则总的available链表中删除该连接
this.available.remove(entry);
// 从该route对应的连接池的leased集合中删除该连接,并且不回收到available链表中
pool.free(entry, false);
} else {
break;
}
}
// 跳出for循环
if (entry != null) {
// 若获取的连接不为空,将连接从总的available链表移除,并添加到leased集合中
// 获取连接成功,直接返回
this.available.remove(entry);
this.leased.add(entry);
onReuse(entry);
return entry;
}
// 计算该route的最大连接数
// New connection is needed
final int maxPerRoute = getMax(route);
// Shrink the pool prior to allocating a new connection
// 计算该route连接池中的连接数 是否 大于等于 route最大连接数
final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
// 若大于等于 route最大连接数,则收缩该route的连接池
if (excess > 0) {
for (int i = 0; i < excess; i++) {
// 获取该route连接池中最不常用的空闲连接,即available链表末尾的连接
// 因为回收连接时,总是将连接添加到available链表的头部,所以链表尾部的连接是最有可能过期的
final E lastUsed = pool.getLastUsed();
if (lastUsed == null) {
break;
}
// 关闭连接,并从总的空闲链表以及route对应的连接池中删除
lastUsed.close();
this.available.remove(lastUsed);
pool.remove(lastUsed);
}
}
// 该route的连接池大小 小于 route最大连接数
if (pool.getAllocatedCount() < maxPerRoute) {
final int totalUsed = this.leased.size();
final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
if (freeCapacity > 0) {
final int totalAvailable = this.available.size();
// 总的空闲连接数 大于等于 总的连接池剩余容量
if (totalAvailable > freeCapacity - 1) {
if (!this.available.isEmpty()) {
// 从总的available链表中 以及 route对应的连接池中 删除连接,并关闭连接
final E lastUsed = this.available.removeLast();
lastUsed.close();
final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
otherpool.remove(lastUsed);
}
}
// 创建新连接,并添加到总的leased集合以及route连接池的leased集合中,函数返回
final C conn = this.connFactory.create(route);
entry = pool.add(conn);
this.leased.add(entry);
return entry;
}
}
//route的连接池已满,无法分配连接
boolean success = false;
try {
// 将该获取连接的任务放入pending队列
pool.queue(future);
this.pending.add(future);
// 阻塞等待,若在超时之前被唤醒,则返回true;若直到超时才返回,则返回false
success = future.await(deadline);
} finally {
// In case of 'success', we were woken up by the
// connection pool and should now have a connection
// waiting for us, or else we're shutting down.
// Just continue in the loop, both cases are checked.
// 无论是 被唤醒返回、超时返回 还是被 中断异常返回,都会进入finally代码段
// 从pending队列中移除
pool.unqueue(future);
this.pending.remove(future);
}
// check for spurious wakeup vs. timeout
// 判断是伪唤醒 还是 连接超时
// 若是 连接超时,则跳出while循环,并抛出 连接超时的异常;
// 若是 伪唤醒,则继续循环获取连接
if (!success && (deadline != null) &&
(deadline.getTime() <= System.currentTimeMillis())) {
break;
}
}
throw new TimeoutException("Timeout waiting for connection");
} finally {
// 释放锁
this.lock.unlock();
}
}</code></pre>
<h4>release 释放连接</h4>
<pre><code> @Override
public void release(final E entry, final boolean reusable) {
// 获取锁
this.lock.lock();
try {
// 从总的leased集合中移除连接
if (this.leased.remove(entry)) {
final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
// 回收连接
pool.free(entry, reusable);
if (reusable && !this.isShutDown) {
this.available.addFirst(entry);
onRelease(entry);
} else {
entry.close();
}
// 获取pending队列队头的任务(先进先出原则),唤醒该阻塞的任务
PoolEntryFuture<E> future = pool.nextPending();
if (future != null) {
this.pending.remove(future);
} else {
future = this.pending.poll();
}
if (future != null) {
future.wakeup();
}
}
} finally {
// 释放锁
this.lock.unlock();
}
}</code></pre>
<h2>总结</h2>
<p><code> AbstractConnPool</code>其实就是通过在获取连接、释放连接时加锁,来实现线程安全,思路非常简单,但它没有在route对应的连接池中加锁对象,即<code> RouteSpecificPool</code>的获取连接、释放连接操作是不加锁的,因为已经在<code> AbstractConnPool</code>的外部调用中加锁,所以是线程安全的,简化了设计。</p>
<p>另外每个<code>route</code>对应一个连接池,实现了在<code>host</code>级别的隔离,若下游的某台提供服务的主机挂了,无效的连接最多只占用该<code>route</code>对应的连接池,不会占用整个连接池,从而拖垮整个服务。</p>
<p>以上。</p>
<h2>原文链接</h2>
<p><a href="https://segmentfault.com/a/1190000012009507">https://segmentfault.com/a/11...</a></p>
Hadoop、Hbase伪分布式安装
https://segmentfault.com/a/1190000011987614
2017-11-13T17:23:10+08:00
2017-11-13T17:23:10+08:00
扑火的蛾
https://segmentfault.com/u/butterflyofzhang
0
<h2>环境</h2>
<p>本文介绍Hadoop、Hbase的伪分布式安装。</p>
<ul>
<li>操作系统: Centos7</li>
<li>Hadoop: 2.7.3</li>
<li>Hbase: 1.2.3</li>
</ul>
<h2>Hadoop安装</h2>
<h4>JAVA_HOME环境变量配置</h4>
<p>由于Hbase是基于HDFS的,所以我们首先部署Hadoop。</p>
<p>下载jdk7和hadoop-2.7.3</p>
<pre><code>cd hadoop-2.7.3/etc/hadoop
vim hadoop-env.sh</code></pre>
<p>打开文件后,搜索这一行</p>
<pre><code>export JAVA_HOME</code></pre>
<p>因为默认是使用系统<code>JAVA_HOME</code>,所以要修改成我们下载的jdk,以笔者的环境为例:</p>
<pre><code>export JAVA_HOME=/home/pay/soft/java/jre</code></pre>
<p>设置完以后,运行如下命令</p>
<pre><code>bin/hadoop</code></pre>
<p>如果出现usage文档,即配置成功。</p>
<h4>HDFS配置</h4>
<p>在<code>etc/hadoop/core-site.xml</code>的<code><configuration></code>标签内做如下配置:</p>
<pre><code><property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property></code></pre>
<p>接着在<code>etc/hadoop/hdfs-site.xml</code>的<code><configuration></code>标签内做如下配置:</p>
<pre><code><property>
<name>dfs.replication</name>
<value>1</value>
</property>
<!-- Hadoop 的Web UI端口配置 -->
<property>
<name>dfs.http.address</name>
<value>0.0.0.0:8777</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/usr/local/hadoop/tmp/dfs/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:/usr/local/hadoop/tmp/dfs/data</value>
</property></code></pre>
<p>默认的Hadoop的Web UI端口是50070,若你的机器做了端口限制,可以配置<code>dfs.http.address</code>的新的值,从而可以从浏览器访问Web UI。</p>
<h4>配置SSH的免key登陆</h4>
<pre><code>ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
chmod 0600 ~/.ssh/authorized_keys</code></pre>
<p>由于Hadoop节点之间通过SSH通信,为了避免连接过程中人工输入密码,所以要配置SSH的免key登陆,由于本文讲得是伪分布式,因此只需要对本机localhost免key登陆即可。</p>
<p>完成以上命令以后,输入以下命令,若不需要输入密码,则说明配置。</p>
<pre><code>ssh localhost</code></pre>
<h4>运行</h4>
<p>对文件系统格式化:</p>
<pre><code>bin/hdfs namenode -format</code></pre>
<p>起文件系统:</p>
<pre><code>sbin/start-dfs.sh</code></pre>
<p>检查HDFS是否起了,输入<code>jps</code>,看控制台输出</p>
<pre><code>7923 Jps
15782 NodeManager
22139 DataNode
22683 SecondaryNameNode
21842 NameNode
25072 -- process information unavailable
15380 ResourceManager</code></pre>
<p>若<code>DataNode</code>、<code>NameNode</code>、<code>SecondaryNameNode</code>、<code>NodeManager</code>、<code>ResourceManager</code>这几个进程都起来了,则说明HDFS部署成功。</p>
<p>再访问WEB UI界面,输入<code>http://yourUrl:8777/</code>,yourUrl服务器的地址,浏览器会出现如下界面:</p>
<p><img src="/img/remote/1460000011987619" alt="alt text" title="alt text"></p>
<p>在HDFS中创建文件夹,并将文件上传到HDFS:</p>
<pre><code>bin/hadoop fs -mkdir /usr
bin/hadoop fs -mkdir /usr/test
./hadoop fs -put /home/pin/hadoop-2.7.3/etc/hadoop/hdfs-site.xml /usr/test</code></pre>
<h4>YARN监控任务</h4>
<p>对<code>etc/hadoop/mapred-site.xml</code>如下配置:</p>
<pre><code><configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration></code></pre>
<p>再对<code>etc/hadoop/mapred-site.xml</code>做如下配置:</p>
<pre><code><configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration></code></pre>
<p>配置完以后,执行脚本起YARN服务:</p>
<pre><code>sbin/start-yarn.sh</code></pre>
<p>用浏览器访问<code>http://yourUrl:8088</code>即可看到资源和节点的管理信息。如下:</p>
<p><img src="/img/remote/1460000011987620" alt="alt text" title="alt text"></p>
<h4>运行示例程序</h4>
<p>运行如下命令:</p>
<pre><code>bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar grep /usr/test /usr/output 'dfs[a-z.]+'</code></pre>
<p>示例程序运行一个正则统计程序,我们可以通过访问<code>http://yourUrl:8088</code>来观察job的运行情况,job结束以后,我们输入以下命令:</p>
<pre><code>>> bin/hadoop fs -cat /usr/output/*
1 dfs.replication
1 dfs.namenode.name.dir
1 dfs.http.address
1 dfs.datanode.data.dir</code></pre>
<p>以上即被正则统计到的字符串以及出现的次数。</p>
<p>至此,Hadoop的安装完成。</p>
<h2>Hbase安装</h2>
<p>解压Hbase的安装包,修改<code>conf/hbase-env.sh</code>文件,如下:</p>
<pre><code>export JAVA_HOME=/home/pay/soft/java
export HBASE_CLASSPATH=/home/pay/pin/hbase-1.2.3/conf
export HBASE_MANAGES_ZK=true</code></pre>
<p>再修改<code>conf/hbase-site.xml</code>文件,如下:</p>
<pre><code><configuration>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.rootdir</name>
<value>hdfs://localhost:9000/hbase</value>
</property>
<!-- Hbase的web UI端口 -->
<property>
<name>hbase.master.info.port</name>
<value>8171</value>
</property>
</configuration></code></pre>
<p>启动Hbase:</p>
<pre><code>./bin/start-hbase.sh</code></pre>
<p>在HDFS上查看Hbase文件,如下则创建成功</p>
<pre><code>>> hadoop fs -ls /hbase
drwxr-xr-x - root supergroup 0 2017-11-13 15:41 /hbase/.tmp
drwxr-xr-x - root supergroup 0 2017-11-13 15:41 /hbase/MasterProcWALs
drwxr-xr-x - root supergroup 0 2017-11-13 15:42 /hbase/WALs
drwxr-xr-x - root supergroup 0 2017-11-13 15:56 /hbase/archive
drwxr-xr-x - root supergroup 0 2017-11-13 15:22 /hbase/data
-rw-r--r-- 3 root supergroup 42 2017-11-13 15:22 /hbase/hbase.id
-rw-r--r-- 3 root supergroup 7 2017-11-13 15:22 /hbase/hbase.version
drwxr-xr-x - root supergroup 0 2017-11-13 16:30 /hbase/oldWALs</code></pre>
<p>输入<code>jps</code>命令,查看进程</p>
<pre><code>27137 HRegionServer
15782 NodeManager
22139 DataNode
22683 SecondaryNameNode
26844 HQuorumPeer
21842 NameNode
25072 -- process information unavailable
27607 Jps
15380 ResourceManager
26959 HMaster</code></pre>
<p>若输出以上的信息,则说明Hadoop和Hbase都创建成功。</p>
<p>以上。</p>
<h2>原文链接</h2>
<p><a href="https://segmentfault.com/a/1190000011987614">https://segmentfault.com/a/11...</a></p>
B+树的正确姿势
https://segmentfault.com/a/1190000011651035
2017-10-20T18:20:56+08:00
2017-10-20T18:20:56+08:00
扑火的蛾
https://segmentfault.com/u/butterflyofzhang
0
<h2>背景</h2>
<p>用过<code>MySQL</code>的同学都知道高效查询需要走索引,否则全表读取会导致慢SQL。<code>InnoDB</code>的索引是采用<code>B+树</code>实现的。网络和书本上关于<code>B+树</code>的定义各不相同,读者们可能都分辨不清哪个是准确的定义。</p>
<h2>定义</h2>
<p>笔者按照《数据库系统概念》(Database System Concepts)这本书上的概念,准确定义<code>B+树</code>。</p>
<p><code>B+树</code>采用的是平衡树结构,从根节点到每个叶子节点的路劲长度都是相同的,我们给每棵树定义<code>n</code>,<code>n</code>是固定不变的,下图是<code>B+树</code>节点全满状态的结构:</p>
<p><img src="/img/remote/1460000011651038?w=690&h=144" alt="alt text" title="alt text"><br>P表示指针,K表示关键字,且如果<code>i < j</code>,则<code>K<sub>i</sub> < K<sub>j</sub></code>(假设没有重复的关键字)。</p>
<p>对于叶节点,<code>i = 1,2,···,n-1</code>, 指针<code>P<sub>i</sub></code>指向具有关键字<code>K<sub>i</sub></code>的一条文件记录,指针<code>P<sub>n</sub></code>指向后一个叶节点,这样所有的叶节点按键值大小顺序串成一个链表,可以高效地进行顺序处理。</p>
<p>非叶节点的结构与叶节点相同,只不过非叶节点的指针都是指向树中的节点。假设有<code>K<sub>i-1</sub>,P<sub>i</sub>,K<sub>i</sub></code>,则指针<code>P<sub>i</sub></code>指向的子树中的关键字值大于等于<code>K<sub>i-1</sub></code>,小于<code>K<sub>i</sub></code>;<code>P<sub>1</sub></code>指向的子树的关键字值,小于<code>K<sub>1</sub></code>;<code>P<sub>n</sub></code>指向的子树的关键字值都大于等于<code>K<sub>n-1</sub></code>。</p>
<ul>
<li><p>对任意节点,指针数 = 关键字数 + 1</p></li>
<li><p>对于任意非叶节点,其指针数必须满足<code>[ceil(n/2), n]</code></p></li>
<li><p>若非叶节点是根节点,则其指针数可以小于<code>ceil(n/2)</code>,但至少包含两个指针,除非整棵树只有一个节点</p></li>
<li><p>对于任意叶节点,其关键字数必须满足<code>[ceil((n-1)/2), n-1]</code></p></li>
<li><p>若叶节点是根节点,则其关键字数可以小于<code>ceil((n-1)/2)</code></p></li>
</ul>
<h2>更新</h2>
<p>关于<code>B+</code>树的查找、插入、删除操作,请参考本人<code>github</code>:</p>
<p><a href="https://link.segmentfault.com/?enc=5ynRnRdJu8b7VvgXyEHh5A%3D%3D.1Sumid6HuEqGgTBGHMD83rph8ete4gt9PnV9%2BQoLjc92HR85YwrU41nC4YFw9RyQ" rel="nofollow">https://github.com/butterflyl...</a></p>
<h2>原文链接</h2>
<p><a href="https://segmentfault.com/a/1190000011651035">https://segmentfault.com/a/11...</a></p>
Java正则的贪婪和非贪婪模式
https://segmentfault.com/a/1190000011386427
2017-09-27T19:25:09+08:00
2017-09-27T19:25:09+08:00
扑火的蛾
https://segmentfault.com/u/butterflyofzhang
0
<h2>定义</h2>
<ul>
<li>贪婪模式:匹配尽可能多的字符</li>
<li>非贪婪模式:匹配尽可能少的字符</li>
</ul>
<p>在<code>Java</code>的正则表达式中,通过在修饰匹配次数的符号后面加一个<code>?</code>,即非贪婪模式,默认情况下是贪婪模式。</p>
<p>表示匹配次数的符号有:</p>
<pre><code>.? # 任意字符匹配1次或0次
.* # 任意字符匹配0次或多次
.+ # 任意字符匹配1次或多次
.{n} # 任意字符匹配n次
.{n,} # 任意字符匹配至少n次
.{n,m} # 任意字符匹配至少n次,至多m次</code></pre>
<h2>代码</h2>
<pre><code>public static void main(String[] args) {
String input = "aaaabc";
String regex1 = "a{2,3}"; // 贪婪模式
Pattern p1 = Pattern.compile(regex1);
Matcher m1 = p1.matcher(input);
while (m1.find()) {
System.out.println(m1.group());
}
System.out.println("------------------");
String regex2 = "a{2,3}?"; // 非贪婪模式
Pattern p2 = Pattern.compile(regex2);
Matcher m2 = p2.matcher(input);
while (m2.find()) {
System.out.println(m2.group());
}
}</code></pre>
<p>输出:</p>
<pre><code>aaa
------------------
aa
aa</code></pre>
<h2>原文链接</h2>
<p><a href="https://segmentfault.com/a/1190000011386427">https://segmentfault.com/a/11...</a><br> </p>
Cobar源码解析(二)
https://segmentfault.com/a/1190000011354600
2017-09-26T00:50:21+08:00
2017-09-26T00:50:21+08:00
扑火的蛾
https://segmentfault.com/u/butterflyofzhang
1
<h2>报文格式</h2>
<p>这一节我们来讲<code>Cobar Handshake</code>的过程。</p>
<p>MySQL服务端和客户端交互的所有的包格式都是统一的,报文格式如下图:</p>
<p><img src="/img/remote/1460000011354605" alt="alt text" title="alt text"></p>
<p>MySQL报文的消息头共有4个字节,前3字节表示的是实际数据的长度(不包含消息头),并且字节是按照小端模式排放的。</p>
<p>第四个字节MySQL为了防止串包用的,其原理是每收到一个报文,都在<code>sequence id</code>上加1。如果数据库检测到<code>sequence id</code>是连续的,则表明没有串包,如果不连续,则表示串包,数据库会直接丢弃该连接。</p>
<pre><code>小端模式就是低位字节排放在内存的低地址端,高位字节排放在内存的高地址端。
大端模式则相反。</code></pre>
<p>下面是<code>Handshake</code>包的结构,括号内表示该字段的字节数:</p>
<p><img src="/img/remote/1460000011354606" alt="alt text" title="alt text"></p>
<p><code>seed</code>部分是加密种子,分为前后两个部分,通过随机数生成。</p>
<h2>源码分析</h2>
<p>上一节我们分析到,当一个前端连接过来,并不是直接和<code>selector</code>绑定,而是先插入到<code>R</code>线程的注册队列中,这样能释放<code>NIOAcceptor</code>的压力,处理更多前端连接。所以,连接和<code>selector</code>的绑定过程是在<code>R</code>线程中进行的,由register方法实现,代码如下:</p>
<pre><code>private void register(Selector selector) {
NIOConnection c = null;
while ((c = registerQueue.poll()) != null) {
try {
c.register(selector);
} catch (Throwable e) {
c.error(ErrorCode.ERR_REGISTER, e);
}
}
}</code></pre>
<p>实际的绑定操作是由<code>NIOConnection</code>的<code>register</code>方法实现的,<code>NIOConnection</code>接口的抽象类是<code>AbstractConnection</code>,我们来看它实现的<code>register</code>方法:</p>
<pre><code>@Override
public void register(Selector selector) throws IOException {
try {
// 该连接只监听socket可读事件
processKey = channel.register(selector, SelectionKey.OP_READ, this);
isRegistered = true;
} finally {
if (isClosed.get()) {
clearSelectionKey();
}
}
}</code></pre>
<p>我们发现,前端连接注册选择器时,只监听了可读事件。这是考虑到,<code>Java</code>的<code>NIO</code>属于水平触发LT(只要满足条件,就触发一个事件),使用水平触发时,如果应用程序不需要写就不要关注socket可写的事件,否则就会无限次地立即返回<code>write ready notification</code>,长期关注socket可写事件会出现CPU打满的情况,所以在使用JDK的NIO编程时,如果没有数据往外写,就取消写事件,有数据往外写时再注册写事件。</p>
<p><code>FrontendConnection</code>继承了<code>AbstractConnection</code>,它又重新实现了<code>register</code>方法,代码如下:</p>
<pre><code>@Override
public void register(Selector selector) throws IOException {
// 调用父类的register方法
super.register(selector);
if (!isClosed.get()) {
// 生成认证数据
byte[] rand1 = RandomUtil.randomBytes(8);
byte[] rand2 = RandomUtil.randomBytes(12);
// 保存认证数据
byte[] seed = new byte[rand1.length + rand2.length];
System.arraycopy(rand1, 0, seed, 0, rand1.length);
System.arraycopy(rand2, 0, seed, rand1.length, rand2.length);
this.seed = seed;
// 发送握手数据包
HandshakePacket hs = new HandshakePacket();
hs.packetId = 0;
hs.protocolVersion = Versions.PROTOCOL_VERSION;
hs.serverVersion = Versions.SERVER_VERSION;
hs.threadId = id;
hs.seed = rand1;
hs.serverCapabilities = getServerCapabilities();
hs.serverCharsetIndex = (byte) (charsetIndex & 0xff);
hs.serverStatus = 2;
hs.restOfScrambleBuff = rand2;
// 异步写入Handshake包
hs.write(this);
}
}</code></pre>
<p>该方法生成了HandShake包,和上面结构图相一致,关键是最后异步写入<code>HandShake</code>包的<code>write</code>方法,代码如下:</p>
<pre><code>public void write(FrontendConnection c) {
// 分配缓存
ByteBuffer buffer = c.allocate();
// 将HandShake包写入缓存
BufferUtil.writeUB3(buffer, calcPacketSize());
buffer.put(packetId);
buffer.put(protocolVersion);
BufferUtil.writeWithNull(buffer, serverVersion);
BufferUtil.writeUB4(buffer, threadId);
BufferUtil.writeWithNull(buffer, seed);
BufferUtil.writeUB2(buffer, serverCapabilities);
buffer.put(serverCharsetIndex);
BufferUtil.writeUB2(buffer, serverStatus);
buffer.put(FILLER_13);
// buffer.position(buffer.position() + 13);
BufferUtil.writeWithNull(buffer, restOfScrambleBuff);
// 将ByteBuffer中的数据异步写入Socket
c.write(buffer);
}</code></pre>
<p>我们再来看最后一行的<code>write</code>方法:</p>
<pre><code>@Override
public void write(ByteBuffer buffer) {
// 检查连接是否关闭,若关闭则将缓存回收
if (isClosed.get()) {
processor.getBufferPool().recycle(buffer);
return;
}
if (isRegistered) {
try {
// 将缓存先插入对队列中,其实就是一个循环数组,如数组已满,则 wait;
// 这个队列是AbstractConnection的一个成员变量
writeQueue.put(buffer);
} catch (InterruptedException e) {
error(ErrorCode.ERR_PUT_WRITE_QUEUE, e);
return;
}
// 插入队列后,调用NIOProcessor的postWrite方法,其实就是NIOReacor的postWrite方法
processor.postWrite(this);
} else {
// 若连接未注册,也回收缓存
processor.getBufferPool().recycle(buffer);
close();
}
}</code></pre>
<p>我们看<code>NIOReactor</code>的postWrite方法:</p>
<pre><code>final void postWrite(NIOConnection c) {
reactorW.writeQueue.offer(c);
}</code></pre>
<p>其实是将连接插入到<code>W</code>线程的<code>writeQueue</code>阻塞队列中,我们再来看<code>W</code>线程的<code>run</code>方法,</p>
<pre><code>@Override
public void run() {
NIOConnection c = null;
for (;;) {
try {
if ((c = writeQueue.take()) != null) {
write(c);
}
} catch (Throwable e) {
LOGGER.warn(name, e);
}
}
}
private void write(NIOConnection c) {
try {
c.writeByQueue();
} catch (Throwable e) {
c.error(ErrorCode.ERR_WRITE_BY_QUEUE, e);
}
}</code></pre>
<p>轮询阻塞队列,若队列不为空,则取出连接,基于队列写方法<code>writeByQueue</code>将缓存中的数据写入socket,下一节再分析<code>writeByQueue</code>方法。</p>
<h2>总结</h2>
<p>阅读源码后,发现<code>Cobar</code>从前端连接的<code>accept</code>并注册<code>selector</code>到发送<code>Handshake</code>包都是异步,本质是将连接插入到<code>R</code>线程和<code>W</code>线程的阻塞队列中,不立即进行注册和写操作,从而实现整个过程的异步化,提高了<code>Cobar</code>的吞吐量。</p>
<p>以上。</p>
<h2>原文链接</h2>
<p><a href="https://segmentfault.com/a/1190000011354600">https://segmentfault.com/a/11...</a></p>
分布式调用跟踪实战
https://segmentfault.com/a/1190000011265918
2017-09-19T22:15:00+08:00
2017-09-19T22:15:00+08:00
扑火的蛾
https://segmentfault.com/u/butterflyofzhang
2
<h2>背景</h2>
<p>分布式环境下,跨服务之间的调用错综复杂,如果突然爆出一个错误,虽然有日志记录,但到底是哪个服务出了问题呢?是移动端传的参数有错误,还是系统X或者系统Y提供的接口导致?在这种情况下,错误排查起来就非常费劲。</p>
<p>为了追踪一个请求完整的流转过程,我可以给请求分配一个唯一的<code>traceId</code>,当请求调用其他服务时,我们传递这个<code>traceId</code>。在输出日志时,将这个<code>traceId</code>打印到日志文件中,这样,从日志文件中,根据<code>traceId</code>就可以分析一个请求完整的调用过程,若更进一步,还可以做性能分析。</p>
<h2>TraceID在Http服务中的实现</h2>
<p>在一个服务的内部,我们不希望在调用每个方法时,都带上<code>traceId</code>这个参数(这样实在太蠢了- . -)。</p>
<p>在Java中,我们一般将<code>traceId</code>放到<code>ThreadLocal</code>中,这样在打印日志时,日志框架从<code>ThreadLocal</code>取出<code>traceId</code>,和其他需要打印的信息一起打印出来。这样对框架的使用者来说,<code>traceId</code>就是透明的,并不需要去关注它。</p>
<p>我们来看代码实现:</p>
<pre><code>/**
* 建立日志MDC上下文属性的拦截器
*/
public class WebLogMdcHandlerInterceptor extends HandlerInterceptorAdapter {
/**
* traceId一般由前端的负载生成,比如Nignx
*/
private boolean generateTraceId = false;
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
String ctxTraceId = null;
String ctxOpId = null;
// 判断Http header中是否有traceId字段,如果没有,则通过随机数生成
if (StringUtils.isNotBlank(request.getHeader(Conventions.TRACE_ID_HEADER))) {
ctxTraceId = request.getHeader(Conventions.TRACE_ID_HEADER);
} else if (generateTraceId) {
ctxTraceId = getTraceId();
}
ctxOpId = UUID.randomUUID().toString();
MDC.put(Conventions.CTX_TRACE_ID_MDC, ctxTraceId + "," + ctxOpId);
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
MDC.clear();
}
// 通过随机数生成traceId,也可以通过其他方式实现,只要保证唯一即可
private static String getTraceId() {
Random random = new Random();
String rs1 = String.valueOf(random.nextInt(10000));
String rs2 = String.valueOf(random.nextInt(10000));
return rs1 + rs2;
}
public void setGenerateTraceId(boolean generateTraceId) {
this.generateTraceId = generateTraceId;
}
}</code></pre>
<p>实现其实比较简单,使用<code>MDC</code>(Mapped Diagnostic Contexts)来实现,<code>logback</code>和<code>log4j</code>支持<code>MDC</code>,<code>MDC</code>的底层实现其实很容易理解,就是通过<code>ThreadLocal</code>来维护<code>key-value</code>,源码如下:</p>
<pre><code>public final class LogbackMDCAdapter implements MDCAdapter {
final InheritableThreadLocal<Map<String, String>> copyOnInheritThreadLocal = new InheritableThreadLocal<Map<String, String>>();
...
...
}</code></pre>
<p><code>WebLogMdcHandlerInterceptor</code>继承了<code>HandlerInterceptorAdapter</code>,<code>HandlerInterceptorAdapter</code>是一个拦截器适配器,我们实现了它其中的2个方法:</p>
<ul>
<li>preHandle: 实现处理器的预处理</li>
<li>afterCompletion: 整个请求处理完毕回调方法,可以进行一些资源清理</li>
</ul>
<p>我们在<code>afterCompletion</code>方法中对<code>MDC</code>进行了<code>clear</code>操作,底层调用了<code>ThreadLocal</code>的<code>remove</code>方法,清除当前线程中的线程局部变量。其作用有两个,一是防止<code>ThreadLocal</code>导致的内存溢出,二是<code>Tomcat</code>容器线程复用时,新请求会依旧使用原来的<code>MDC</code>中的<code>traceId</code>,会导致<code>traceId</code>的"串码"现象。</p>
<p>我们再来讲一下<code>preHandle</code>方法中的<code>ctxOpId</code>,即我们向<code>MDC</code>中不仅仅写入<code>http header</code>中的<code>traceId</code>,还通过UUID生成了一个<code>ctxOpId</code>。</p>
<p><img src="/img/remote/1460000011265923" alt="alt text" title="alt text"></p>
<p>如上图,A服务的某个方法连续调用了B服务的某个接口3次(可能是重试机制导致,也有可能确实是业务逻辑),如何区分这3次调用呢?只通过<code>traceId</code>无法区分,因为这三次的<code>traceId</code>都相同,所以每次调用时UUID生成<code>ctxOpId</code>,来区分这三次调用。</p>
<p>然后在logback.xml文件中配置<code>pattern</code>,如下:</p>
<pre><code><pattern>%d %-5level [%X{ctxTraceId}][%thread] %logger{5} - %msg%n</pattern></code></pre>
<p>具体打印日志时,会根据<code>pattern</code>格式打印,各字段的含义可自行百度。</p>
<p>最后,当我们在调用其他Http服务时,先获取当前线程的<code>ThreadLocal</code>上下文,将<code>traceId</code>写入<code>http client</code>的<code>header</code>中,从而达到跨服务传递<code>traceId</code>。</p>
<p>这是一个简单的实现分布式调用追踪的实践,以上。</p>
<h2>原文链接</h2>
<p><a href="https://segmentfault.com/a/1190000011265918">https://segmentfault.com/a/11...</a></p>
Cobar源码解析(一)
https://segmentfault.com/a/1190000011122787
2017-09-12T00:53:17+08:00
2017-09-12T00:53:17+08:00
扑火的蛾
https://segmentfault.com/u/butterflyofzhang
0
<h2>简介</h2>
<p>当业务的数据量和访问量急剧增加的情况下,我们需要对数据进行水平拆分,从而降低单库的压力,并且数据的水平拆分需要对业务透明,屏蔽掉水平拆分的细节。并且,前端业务的高并发会导致后端的数据库连接过多,从而DB的性能低下。</p>
<p>Cobar就是解决这些问题的一款分库分表中间件,Cobar以proxy的形式位于前端应用和后端数据库之间,Cobar对前端暴露的接口是MySQL通信协议,其将前端传输过来的SQL语句按照sharding规则路由到后端的数据库实例上,再合并多个实例返回的结果,从而模拟单库下的数据库行为。</p>
<p>Cobar的使用方法就不多介绍了,本文的主要内容是剖析Cobar的源代码。</p>
<h2>Cobar的前端连接模型</h2>
<p>结构图如下:</p>
<p><img src="/img/remote/1460000011122792" alt="alt text" title="alt text"></p>
<p>我们先来看CobarServer的代码:</p>
<pre><code>private CobarServer() {
this.config = new CobarConfig();
SystemConfig system = config.getSystem();
MySQLLexer.setCStyleCommentVersion(system.getParserCommentVersion());
this.timer = new Timer(NAME + "Timer", true);
this.initExecutor = ExecutorUtil.create("InitExecutor", system.getInitExecutor());
this.timerExecutor = ExecutorUtil.create("TimerExecutor", system.getTimerExecutor());
this.managerExecutor = ExecutorUtil.create("ManagerExecutor", system.getManagerExecutor());
this.sqlRecorder = new SQLRecorder(system.getSqlRecordCount());
this.isOnline = new AtomicBoolean(true);
this.startupTime = TimeUtil.currentTimeMillis();
}
</code></pre>
<p>上面是CobarServer的构造函数,它的限定是private的。</p>
<pre><code>private static final CobarServer INSTANCE = new CobarServer();
public static final CobarServer getInstance() {
return INSTANCE;
}</code></pre>
<p>而CobarServer又有一个私有的静态变量<code>INSTANCE</code>,以及获取这个私有静态变量的静态方法,显然,这是一个单例设计模式,使程序运行的时候全局只有一个CobarServer对象。</p>
<p>我们再来看CobarServer的startup()方法,此方法中构造了一个NIOAcceptor(绑定服务器端口,接受客户端的连接),</p>
<pre><code>server = new NIOAcceptor(NAME + "Server", system.getServerPort(), sf);</code></pre>
<p>构造了一个接收前端连接的非阻塞Acceptor,让我们在来看NIOAcceptor类的代码。</p>
<pre><code>public final class NIOAcceptor extends Thread {
private static final Logger LOGGER = Logger.getLogger(NIOAcceptor.class);
private static final AcceptIdGenerator ID_GENERATOR = new AcceptIdGenerator();
private final int port;
private final Selector selector;
private final ServerSocketChannel serverChannel;
private final FrontendConnectionFactory factory;
private NIOProcessor[] processors;
private int nextProcessor;
private long acceptCount;
public NIOAcceptor(String name, int port, FrontendConnectionFactory factory) throws IOException {
super.setName(name);
this.port = port;
this.selector = Selector.open(); # 生成选择器
this.serverChannel = ServerSocketChannel.open();
this.serverChannel.socket().bind(new InetSocketAddress(port)); # 绑定服务器端口
this.serverChannel.configureBlocking(false); # 设置非阻塞模式
this.serverChannel.register(selector, SelectionKey.OP_ACCEPT); # 监听ACCEPT事件,
this.factory = factory; # 设置前端连接的工厂
}
}</code></pre>
<p>以上的代码都是NIO编程中很常见的操作。下面我们看run()方法,</p>
<pre><code>@Override
public void run() {
final Selector selector = this.selector;
for (;;) {
++acceptCount;
try {
selector.select(1000L); # select操作是阻塞的,若没有监听到相应的事件,则一直阻塞,直到超过1000毫秒,则返回
Set<SelectionKey> keys = selector.selectedKeys();
try {
for (SelectionKey key : keys) {
if (key.isValid() && key.isAcceptable()) {
accept(); # 接受连接,这个方法很关键
} else {
key.cancel();
}
}
} finally {
keys.clear();
}
} catch (Throwable e) {
LOGGER.warn(getName(), e);
}
}
}</code></pre>
<p>以上的run方法也是常见的NIO中监听事件的套路,其中accept()方法是定义的私有函数,accept方法是为了将channel与selector绑定,代码如下,</p>
<pre><code>private void accept() {
SocketChannel channel = null;
try {
channel = serverChannel.accept(); # 为新的连接分配socket
channel.configureBlocking(false); # 设置为非阻塞模式
# factory将channel进行封装,进行相应的设置,返回一个FrontendConnection,connection本质上就是一个封装好的channel
FrontendConnection c = factory.make(channel);
c.setAccepted(true);
c.setId(ID_GENERATOR.getId()); # 为连接设置ID
NIOProcessor processor = nextProcessor(); # 为连接分配processor,NIOAcceptor中包含了一个NIOProcessor数组,分配的策略即根据下标不断后移,到达数组末尾后又从数组的起始位置开始分配
c.setProcessor(processor);
# 回调NIOProcessor的postRegister方法,而processor的postRegister调用的是NIOReactor类的postRegister方法
processor.postRegister(c);
} catch (Throwable e) {
closeChannel(channel);
LOGGER.warn(getName(), e);
}
}</code></pre>
<p>让我来看NIOProcessor的postRegister方法,</p>
<pre><code>public void postRegister(NIOConnection c) {
reactor.postRegister(c);
}</code></pre>
<p>NIOProcessor类中定义了一个NIOReactor类的成员变量reactor,而postRegister调用的是NIOReactor的postRegister方法。下面让我们来看NIOReactor的postRegister代码,</p>
<pre><code>final void postRegister(NIOConnection c) {
# 只是先将前端连接插入R线程的阻塞队列中,并没有立刻将channel与selector进行绑定
reactorR.registerQueue.offer(c);
# 唤醒R线程的selector,若之前的select操作没有返回的话则立即返回
reactorR.selector.wakeup();
}</code></pre>
<p>既然channel与selector没有立刻进行绑定,那它们是什么时候绑定的呢?我们来看NIOReactor中内部类R的run()方法,</p>
<pre><code>@Override
public void run() {
final Selector selector = this.selector;
for (;;) {
++reactCount;
try {
selector.select(1000L);
# 将connection与selector进行绑定
register(selector);
Set<SelectionKey> keys = selector.selectedKeys();
try {
for (SelectionKey key : keys) {
Object att = key.attachment();
if (att != null && key.isValid()) {
int readyOps = key.readyOps();
if ((readyOps & SelectionKey.OP_READ) != 0) {
read((NIOConnection) att);
} else if ((readyOps & SelectionKey.OP_WRITE) != 0) {
write((NIOConnection) att);
} else {
key.cancel();
}
} else {
key.cancel();
}
}
} finally {
keys.clear();
}
} catch (Throwable e) {
LOGGER.warn(name, e);
}
}
}</code></pre>
<p>在run方法中,当select方法返回的时候,就会进行channel和selector的绑定,因为当connection插入到阻塞队列中的时候,会对selector进行wakeup(),即select(1000L)方法会立即返回,所以不必担心channel会卡一秒钟才会和selector进行绑定。</p>
<p>我们再来看R线程的register方法,</p>
<pre><code>private void register(Selector selector) {
NIOConnection c = null;
# 将R线程阻塞队列中的所有连接都轮询取出,与selector进行绑定
while ((c = registerQueue.poll()) != null) {
try {
c.register(selector);
} catch (Throwable e) {
c.error(ErrorCode.ERR_REGISTER, e);
}
}
}</code></pre>
<h2>总结</h2>
<p>关于NIOAcceptor为何先将connection放入Reactor的阻塞队列,而不是直接绑定。笔者的观点是,如果由NIOAcceptor负责绑定则会造成锁竞争,selector的register方法会争用锁,会导致NIOAcceptor线程和R、W线程竞争selector的锁,若acceptor中处理绑定connection的逻辑,则NIOAcceptor就不能快速地处理大量的连接,整个系统的吞吐就会降低。所以Cobar中的设计是将connection的绑定放到R线程的阻塞队列中去,让R线程来完成connection的绑定工作。</p>
<p>图就随意看看吧-.-,有点丑。</p>
<p>以上。</p>
<hr>
<h3>原文链接</h3>
<p><a href="https://segmentfault.com/a/1190000011122787">https://segmentfault.com/a/11...</a></p>
shell命令行参数解析工具:getopts
https://segmentfault.com/a/1190000010171506
2017-07-13T15:18:35+08:00
2017-07-13T15:18:35+08:00
扑火的蛾
https://segmentfault.com/u/butterflyofzhang
1
<h2>shell命令行参数解析工具:getopts</h2>
<p>在<code>shell</code>脚本中,对于简单的参数,我们使用<code>$1 $2 </code>来处理即可,具体如下:</p>
<pre><code>#!/bin/bash
SOFT_DIR=$1
MAVEN_DIR=$2
echo $SOFT_DIR
echo $MAVEN_DIR
-----------------
$ sh test.sh /home/soft /home/soft/maven
/home/soft
/home/soft/maven</code></pre>
<p>但是,如果你的脚本参数非常多,那使用上面的这种方式就非常不合适,你无法清楚地记得每个位置对应的是什么参数。所以,我们可以使用<code>bash</code>内置的<code>getopts</code>,下面是一个简单的例子:</p>
<pre><code>#!/bin/bash
usage() {
echo "Usage:"
echo " test.sh [-j JAVA_DIR] [-m MAVEN_DIR]"
echo "Description:"
echo " JAVA_DIR, the path of java."
echo " MAVEN_DIR, the path of maven."
exit -1
}
upload="false"
while getopts 'h:j:m:u' OPT; do
case $OPT in
j) JAVA_DIR="$OPTARG";;
m) MAVEN_DIR="$OPTARG";;
u) upload="true";;
h) usage;;
?) usage;;
esac
done
echo $JAVA_DIR
echo $MAVEN_DIR
echo $upload
---------------------------
$ sh test.sh -j /home/soft/java -m /home/soft/maven
/home/soft/java
/home/soft/maven
false
$ sh test.sh -j /home/soft/java -m /home/soft/maven -u
/home/soft/java
/home/soft/maven
true
$ sh test.sh -h
test.sh: option requires an argument -- h
Usage:
test.sh [-j JAVA_DIR] [-m MAVEN_DIR]
Description:
JAVA_DIR, the path of java.
MAVEN_DIR, the path of maven.</code></pre>
<p><code>getopts</code>后面跟的字符串就是参数列表,每个字母代表一个选项,如果字母后面跟一个<code>:</code>就表示这个选项还会有一个值,比如上面例子中对应的<code>-j /home/soft/java </code>和<code>-m /home/soft/maven </code>。而<code>getopts</code>字符串中没有跟随<code>:</code>的字母就是开关型选项,不需要指定值,等同于<code>true/false</code>,只要带上了这个参数就是<code>true</code>。</p>
<p><code>getopts</code>识别出各个选项之后,就可以配合case进行操作。操作中,有两个"常量",一个是<code>OPTARG</code>,用来获取当前选项的值;另外一个就是<code>OPTIND</code>,表示当前选项在参数列表中的位移。case的最后一项是<code>?</code>,用来识别非法的选项,进行相应的操作,我们的脚本中输出了帮助信息。</p>
<p>当选项参数识别完成以后,我们就能识别剩余的参数了,我们可以使用<code>shift</code>进行位移,抹去选项参数。</p>
<pre><code>#!/bin/bash
usage() {
echo "Usage:"
echo " test.sh [-j JAVA_DIR] [-m MAVEN_DIR]"
echo "Description:"
echo " JAVA_DIR, the path of java."
echo " MAVEN_DIR, the path of maven."
exit -1
}
upload="false"
echo $OPTIND
while getopts 'j:m:u' OPT; do
case $OPT in
j) JAVA_DIR="$OPTARG";;
m) MAVEN_DIR="$OPTARG";;
u) upload="true";;
h) usage;;
?) usage;;
esac
done
echo $OPTIND
shift $(($OPTIND - 1))
echo $1
---------------
$ sh test.sh -j /home/soft/java -m /home/soft/maven otherargs
1
5
otherargs
sh test.sh -j /home/soft/java -m /home/soft/maven -u otherargs
1
6
otherargs</code></pre>
<p>在上面的脚本中,我们位移的长度等于case循环结束后的<code>OPTIND - 1</code>,<code>OPTIND</code>的初始值为1,当选项参数处理结束后,其指向剩余参数的第一个。<code>getopts</code>在处理参数时,处理带值的选项参数,<code>OPTIND</code>加2;处理开关型变量时,<code>OPTIND</code>则加1。</p>
<p>以上就是对<code>getopts</code>的简单介绍。</p>
<h2>原文链接</h2>
<p><a href="https://segmentfault.com/a/1190000010171506">https://segmentfault.com/a/11...</a></p>
MySQL乐观锁在分布式场景下的实践
https://segmentfault.com/a/1190000008935924
2017-04-03T21:34:58+08:00
2017-04-03T21:34:58+08:00
扑火的蛾
https://segmentfault.com/u/butterflyofzhang
10
<h2>MySQL乐观锁在分布式场景下的实践</h2>
<h4>背景</h4>
<p>在电商购物的场景下,当我们点击购物时,后端服务就会对相应的商品进行减库存操作。在单实例部署的情况,我们可以简单地使用JVM提供的锁机制对减库存操作进行加锁,防止多个用户同时点击购买后导致的库存不一致问题。</p>
<p>但在实践中,为了提高系统的可用性,我们一般都会进行多实例部署。而不同实例有各自的JVM,被负载均衡到不同实例上的用户请求不能通过JVM的锁机制实现互斥。</p>
<p>因此,为了保证在分布式场景下的数据一致性,我们一般有两种实践方式:一、使用MySQL乐观锁;二、使用分布式锁。</p>
<p>本文主要介绍MySQL乐观锁,关于分布式锁我在下一篇博客中介绍。</p>
<h4>乐观锁简介</h4>
<p>乐观锁(Optimistic Locking)与悲观锁相对应,我们在使用乐观锁时会假设数据在极大多数情况下不会形成冲突,因此只有在数据提交的时候,才会对数据是否产生冲突进行检验。如果产生数据冲突了,则返回错误信息,进行相应的处理。</p>
<p>那我们如何来实现乐观锁呢?一般采用以下方式:使用版本号(version)机制来实现,这是乐观锁最常用的实现方式。</p>
<h6>版本号</h6>
<p>那什么是版本号呢?版本号就是为数据添加一个版本标志,通常我会为数据库中的表添加一个int类型的"version"字段。当我们将数据读出时,我们会将version字段一并读出;当数据进行更新时,会对这条数据的version值加1。当我们提交数据的时候,会判断数据库中的当前版本号和第一次取数据时的版本号是否一致,如果两个版本号相等,则更新,否则就认为数据过期,返回错误信息。我们可以用下图来说明问题:</p>
<p><img src="/img/remote/1460000008935927?w=471&h=356" alt="alt text" title="alt text"></p>
<p>如图所示,如果更新操作如第一个图中一样顺序执行,则数据的版本号会依次递增,不会有冲突出现。但是像第二个图中一样,不同的用户操作读取到数据的同一个版本,再分别对数据进行更新操作,则用户的A的更新操作可以成功,用户B更新时,数据的版本号已经变化,所以更新失败。</p>
<h4>代码实践</h4>
<p>我们对某个商品减库存时,具体操作分为以下3个步骤:</p>
<ol>
<li><p>查询出商品的具体信息</p></li>
<li><p>根据具体的减库存数量,生成相应的更新对象</p></li>
<li><p>修改商品的库存数量</p></li>
</ol>
<p>为了使用MySQL的乐观锁,我们需要为商品表goods加一个版本号字段version,具体的表结构如下:</p>
<pre><code>CREATE TABLE `goods` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(64) NOT NULL DEFAULT '',
`remaining_number` int(11) NOT NULL,
`version` int(11) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;</code></pre>
<p>Goods类的Java代码:</p>
<pre><code>public class Goods implements Serializable {
private static final long serialVersionUID = 0L;
private Integer id;
/**
* 商品名字
*/
private String name;
/**
* 库存数量
*/
private Integer remainingNumber;
/**
* 版本号
*/
private Integer version;
@Override
public String toString() {
return "Goods{" +
"id=" + id +
", name='" + name + '\'' +
", remainingNumber=" + remainingNumber +
", version=" + version +
'}';
}
}</code></pre>
<p>GoodsMapper.java:</p>
<pre><code>public interface GoodsMapper {
Integer updateGoodCAS(Goods good);
}</code></pre>
<p>GoodsMapper.xml如下:</p>
<pre><code><update id="updateGoodCAS" parameterType="com.ztl.domain.Goods">
<![CDATA[
update goods
set `name`=#{name},
remaining_number=#{remainingNumber},
version=version+1
where id=#{id} and version=#{version}
]]>
</update></code></pre>
<p>GoodsService.java 接口如下:</p>
<pre><code>public interface GoodsService {
@Transactional
Boolean updateGoodCAS(Integer id, Integer decreaseNum);
}</code></pre>
<p>GoodsServiceImpl.java类如下:</p>
<pre><code>@Service
public class GoodsServiceImpl implements GoodsService {
@Autowired
private GoodsMapper goodsMapper;
@Override
public Boolean updateGoodCAS(Integer id, Integer decreaseNum) {
Goods good = goodsMapper.selectGoodById(id);
System.out.println(good);
try {
Thread.sleep(3000); //模拟并发情况,不同的用户读取到同一个数据版本
} catch (InterruptedException e) {
e.printStackTrace();
}
good.setRemainingNumber(good.getRemainingNumber() - decreaseNum);
int result = goodsMapper.updateGoodCAS(good);
System.out.println(result == 1 ? "success" : "fail");
return result == 1;
}
}</code></pre>
<p>GoodsServiceImplTest.java测试类</p>
<pre><code>@RunWith(SpringRunner.class)
@SpringBootTest
public class GoodsServiceImplTest {
@Autowired
private GoodsService goodsService;
@Test
public void updateGoodCASTest() {
final Integer id = 1;
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
goodsService.updateGoodCAS(id, 1); //用户1的请求
}
});
thread.start();
goodsService.updateGoodCAS(id, 2); //用户2的请求
System.out.println(goodsService.selectGoodById(id));
}
}</code></pre>
<p>输出结果:</p>
<pre><code>Goods{id=1, name='手机', remainingNumber=10, version=9}
Goods{id=1, name='手机', remainingNumber=10, version=9}
success
fail
Goods{id=1, name='手机', remainingNumber=8, version=10}</code></pre>
<p>代码说明:</p>
<p>在updateGoodCASTest()的测试方法中,用户1和用户2同时查出id=1的商品的同一个版本信息,然后分别对商品进行库存减1和减2的操作。从输出的结果可以看出用户2的减库存操作成功了,商品库存成功减去2;而用户1提交减库存操作时,数据版本号已经改变,所以数据变更失败。</p>
<p>这样,我们就可以通过MySQL的乐观锁机制保证在分布式场景下的数据一致性。</p>
<p>以上。</p>
<h4>原文链接</h4>
<p><a href="https://segmentfault.com/a/1190000008935924">https://segmentfault.com/a/11...</a></p>
Restful API 设计规范实战
https://segmentfault.com/a/1190000007313505
2016-10-28T17:07:26+08:00
2016-10-28T17:07:26+08:00
扑火的蛾
https://segmentfault.com/u/butterflyofzhang
7
<h2>Restful API 设计规范</h2>
<h4>使用的名词而不是动词</h4>
<p>不应该使用动词:</p>
<p><code><del>/getAllResources</del></code> <br><code><del>/createNewResources</del></code> <br><code><del>/deleteAllResources</del></code></p>
<h4>GET方法和查询参数不能改变资源状态:</h4>
<p>如果要改变资源的状态,使用PUT、POST、DELETE。下面是错误的用GET方法来修改user的状态:</p>
<pre><code>GET /users/711?activate
GET /users/711/activate
</code></pre>
<h4>Rest的核心原则是将你的API拆分为逻辑上的资源。这些资源通过HTTP被操作(GET,POST,PUT,DELETE)</h4>
<p>我们定义资源ticket、user、group:</p>
<ul>
<li><p>GET /tickets # 获取ticket列表</p></li>
<li><p>GET /tickets/12 # 查看某个具体的ticket</p></li>
<li><p>POST /tickets # 新建一个ticket</p></li>
<li><p>PUT /tickets/12 #新建ticket 12</p></li>
<li><p>DELETE /tickets/12 # 删除ticket 12</p></li>
</ul>
<p>只需要一个endpoint:/tickets,再也没有其他什么命名规则和url规则了。</p>
<p>一个可以遵循的规则是:虽然看起来使用复数来描述某一个资源看起来特别扭,但是统一所有的endpoint,使用复数使得你的URL更加规整。这让API使用者更加容易理解,对开发者来说也更容易实现。</p>
<p><strong>处理关联:</strong></p>
<ul>
<li><p>GET /tickets/12/messages # 获取ticket 12的message列表</p></li>
<li><p>GET /tickets/12/messages/5 #获取ticket 12的message 5</p></li>
<li><p>POST /tickets/12/messages 创建ticket 12的一个message</p></li>
<li><p>PUT /tickets/12/messages/5 更新ticket 12的message 5</p></li>
<li><p>DELETE /tickets/12/messages/5 删除ticket 12的message 5</p></li>
</ul>
<h4>避免层级过深的URI</h4>
<p><code>/</code> 在url中表达层级,用于按实体关联关系进行对象导航,一般根据id导航。</p>
<p>过深的导航容易导致url膨胀,不易维护,如 <code>GET /zoos/1/areas/3/animals/4</code>,尽量使用查询参数代替路劲中的实体导航,如<code>GET /animals?zoo=1&area=3</code>。</p>
<h4>结果过滤,排序,搜索</h4>
<p>url最好越简短越好,对结果过滤、排序、搜索相关的功能都应该通过参数实现。</p>
<p><strong>过滤</strong>:例如你想限制<code>GET /tickets</code> 的返回结果:只返回那些open状态的ticket, <code>GET /tickets?state=open</code> 这里的state就是过滤参数。</p>
<p><strong>排序</strong>:和过滤一样,一个好的排序参数应该能够描述排序规则,而不和业务相关。复杂的排序规则应该通过组合实现。排序参数通过 <code>,</code> 分隔,排序参数前加 <code>-</code> 表示降序排列。</p>
<ul>
<li><p>GET /tickets?sort=-priority #获取按优先级降序排列的ticket列表</p></li>
<li><p>GET /tickets?sort=-priority,created_at #获取按优先级降序排列的ticket列表,在同一个优先级内,先创建的ticket排列在前面。</p></li>
</ul>
<p><strong>搜索</strong>:有些时候简单的排序是不够的。我们可以使用搜索技术来实现</p>
<ul><li><p>GET /tickets?q=return&state=open&sort=-priority,create_at # 获取优先级最高且打开状态的ticket,而且包含单词return的ticket列表。</p></li></ul>
<h4>限制API返回值的域</h4>
<p>有时候API使用者不需要所有的结果,在进行横向限制的同时(例如值返回API结果的前十个),还应该可以进行纵向限制,并且这个功能能有效的提高网络带宽使用率和速度。可以使用fields查询参数来限制返回的域例如:</p>
<ul><li><p>GET /tickets?fields=id,subject,customer_name,updated_at&state=open&sort=-updated_at</p></li></ul>
<h4>Response不要包装</h4>
<p>response 的 body直接就是数据,不要做多余的包装。错误实例:</p>
<pre><code>{
"success":true,
"data":{"id":1, "name":"xiaotuan"}
}
</code></pre>
<h4>更新和创建操作应该返回资源</h4>
<p>在POST操作以后,返回201created 状态码,并且包含一个指向新资源的url作为返回头。</p>
<h4>命名方式</h4>
<p>是蛇形命名还是驼峰命名?如果使用json那么最好的应该是遵守JavaScript的命名方法-驼峰命名法。Java、C# 使用驼峰,python、ruby使用蛇形。</p>
<h4>默认使用pretty print格式,开启gzip</h4>
<p>开启pretty print返回结果会更加友好易读,而且额外的传输也可以忽略不计。如果忘了使用gzip那么传输效率将会大大减少,损失大大增加。</p>
<h2>GitHub v3S实践经验</h2>
<h4>1.Current Version</h4>
<p>通过Accept字段来区分版本号,而不是在url中嵌入版本号: <br><code>Accept: application/vnd.github.v3+json</code></p>
<h4>2.Schema</h4>
<p><strong>Summary Representation</strong></p>
<p>当你请求获取某一资源的列表时,响应仅返回资源的属性子集。有些属性对API来说代价是非常高的,出于性能的考虑,会排除这些属性。要获取这些属性,请求"detailed" representation。</p>
<p><strong>Example:</strong>当你获取仓库的列表时,你获得的是每个仓库的summary representation。</p>
<pre><code>GET /orgs/octokit/repos
</code></pre>
<p><strong>Detailed Representation</strong></p>
<p>当你获取一个单独的资源时,响应会返回这个资源的所有属性。</p>
<p><strong>Example:</strong>当你获取一个单独的仓库,你会获得这个仓库的detailed representation。</p>
<pre><code>GET /repos/octokit/octokit.rb
</code></pre>
<h4>3.Parameters</h4>
<p>许多API都带有可选参数。对于GET请求,任何不作为路径构成部分的参数都可以通过HTTP查询参数传入。</p>
<pre><code>GET https://api.github.com/repos/vmg/redcarpet/issues?state=closed
</code></pre>
<p>在这个例子中,'vmg' 和 'redcarpet' 作为 <strong>:owner</strong> 和 <strong>:repo</strong> 的参数,而 <strong>:state</strong> 作为查询参数。</p>
<p>对于POST、PATCH、PUT和DELETE的请求,不包含在URL中的参数需要编码成JSON传递,且 Content-Type为 'application/json'。</p>
<h4>Root Endpoint</h4>
<p>你可以对根节点GET请求,获取根节点下的所有API分类。</p>
<h4>Client Errors</h4>
<p>有三种可能的客户端错误,在接收到请求体时:</p>
<p><strong>1 </strong>发送非法JSON会返回 <strong>400 Bad Request.</strong></p>
<pre><code>HTTP/1.1 400 Bad Request
Content-Length: 35
{"message":"Problems parsing JSON"}
</code></pre>
<p><strong>2 </strong>发送错误类型的JSON值会返回 <strong>400 Bad Request.</strong></p>
<pre><code>HTTP/1.1 400 Bad Request
Content-Length: 40
{"message":"Body should be a JSON object"}
</code></pre>
<p><strong>3 </strong>发送无效的值会返回 <strong>422 Unprocessable Entity.</strong></p>
<pre><code>HTTP/1.1 422 Unprocessable Entity
Content-Length: 149
{
"message": "Validation Failed",
"errors": [
{
"resource": "Issue",
"field": "title",
"code": "missing_field"
}
]
}
</code></pre>
<p>我们可以告诉发生了什么错误,下面是一些可能的验证错误码:</p>
<table>
<thead><tr>
<th>Error Name</th>
<th>Description</th>
</tr></thead>
<tbody>
<tr>
<td>missing</td>
<td>资源不存在</td>
</tr>
<tr>
<td>missing_field</td>
<td>资源必需的域没有被设置</td>
</tr>
<tr>
<td>invalid</td>
<td>域的格式非法</td>
</tr>
<tr>
<td>already_exists</td>
<td>另一个资源的域的值和此处的相同,这会发生在资源有唯一的键的时候</td>
</tr>
</tbody>
</table>
<h4>HTTP Redirects</h4>
<p>API v3在合适的地方使用HTTP重定向。客户端应该假设任何请求都会导致重定向。重定向在响应头中有一个 <code>Location</code> 的域,此域包含了资源的真实位置。</p>
<h4>HTTP Verbs</h4>
<p>API v3力争使用正确的HTTP动词来表示每次请求。</p>
<table>
<thead><tr>
<th>Verb</th>
<th>Description</th>
</tr></thead>
<tbody>
<tr>
<td>HEAD</td>
<td>对任何资源仅请求头信息</td>
</tr>
<tr>
<td>GET</td>
<td>获取资源</td>
</tr>
<tr>
<td>POST</td>
<td>创建资源</td>
</tr>
<tr>
<td>PATCH</td>
<td>使用部分的JSON数据更新资源</td>
</tr>
<tr>
<td>PUT</td>
<td>取代资源或资源集合</td>
</tr>
<tr>
<td>DELETE</td>
<td>删除资源</td>
</tr>
</tbody>
</table>
<h4>Hypermedia</h4>
<p>很多资源有一个或者更多的 <code>*_url</code> 属性指向其他资源。这意味着服务端提供明确的URL,这样客户端就不必要自己构造URL了。</p>
<h4>Pagination</h4>
<p>请求资源列表时会进行分页,默认每页30个。当你请求后续页的时候可以使用 <code>?page</code> 参数。对于某些资源,你可以通过参数 <code>?per_page</code>自定义每页的大小。</p>
<pre><code>curl 'https://api.github.com/user/repos?page=2&per_page=100'
</code></pre>
<p>需要注意的一点是,页码是从1开始的,当省略参数 <code>?page</code> 时,会返回首页。</p>
<h4>Basics of Pagination</h4>
<p>关于分页的其他相关信息在响应的头信息的 <code>Link</code> 里提供。比如,去请求一个搜索的API,查找Mozilla的项目中哪些包含词汇addClass :</p>
<pre><code>curl -I "https://api.github.com/search/code?q=addClass+user:mozilla"
</code></pre>
<p>头信息中Link字段如下:</p>
<pre><code>Link: <https://api.github.com/search/code?q=addClass+user%3Amozilla&page=2>; rel="next", <https://api.github.com/search/code?q=addClass+user%3Amozilla&page=34>; rel="last"
</code></pre>
<p><code>rel="next"</code> 表示下一页是 <code>page=2</code>。也就是说,默认情况下所有的分页请求都是从首页开始。<code>rel="last"</code> 提供更多信息,表示最后一页是34。即我们还有33页的信息包含addClass。</p>
<p>总之,我们应该依赖于Link提供的信息,而不要尝试自己去猜或者构造URL。</p>
<h4>Navigating through the pages</h4>
<p>既然已经知道会接收多少页面,我们可以通过页面导航来消费结果。我们可以通过传递一个<code>page</code>参数,例如跳到14页:</p>
<pre><code>curl -I "https://api.github.com/search/code?q=addClass+user:mozilla&page=14"
</code></pre>
<p>这是头信息中Link字段:</p>
<pre><code>Link: <https://api.github.com/search/code?q=addClass+user%3Amozilla&page=15>; rel="next",
<https://api.github.com/search/code?q=addClass+user%3Amozilla&page=34>; rel="last",
<https://api.github.com/search/code?q=addClass+user%3Amozilla&page=1>; rel="first",
<https://api.github.com/search/code?q=addClass+user%3Amozilla&page=13>; rel="prev"
</code></pre>
<p>我们会获得更多的信息,<code>rel="first"</code>表示首页,<code>rel="prev"</code>表示前一页的页码。通过这些信息,我们可以构造一个UI界面让用户在first、previous、next、last之间进行跳转。</p>
<h4>Rate Limiting</h4>
<p>对于认证的请求,可以每小时最多请求5000次。对于没有认证的请求,限制在每小时60次请求。</p>
<p>检查返回的HTTP头,可以看到当前的速率限制:</p>
<pre><code>curl -i https://api.github.com/users/whatever
HTTP/1.1 200 OK
Server: GitHub.com
Date: Thu, 27 Oct 2016 03:05:42 GMT
Content-Type: application/json; charset=utf-8
Content-Length: 1219
Status: 200 OK
X-RateLimit-Limit: 60
X-RateLimit-Remaining: 48
X-RateLimit-Reset: 1477540017
</code></pre>
<p>header头信息告诉你当前的速率限制状态:</p>
<table>
<thead><tr>
<th>Header Name</th>
<th>Description</th>
</tr></thead>
<tbody>
<tr>
<td>X-RateLimit-Limit</td>
<td>当前用户被允许的每小时请求数</td>
</tr>
<tr>
<td>X-RateLimit-Remaining</td>
<td>在当前发送窗口内还可以发送的请求数</td>
</tr>
<tr>
<td>X-RateLimit-Reset</td>
<td>按当前速率发送后,发送窗口重置的时间</td>
</tr>
</tbody>
</table>
<p>一旦你超过了发送速率限制,你会收到一个错误响应:</p>
<pre><code>HTTP/1.1 403 Forbidden
Date: Tue, 20 Aug 2013 14:50:41 GMT
Status: 403 Forbidden
X-RateLimit-Limit: 60
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1377013266
{
"message": "API rate limit exceeded for xxx.xxx.xxx.xxx. (But here's the good news: Authenticated requests get a higher rate limit. Check out the documentation for more details.)",
"documentation_url": "https://developer.github.com/v3/#rate-limiting"
}
</code></pre>
<h4>User Agent Required</h4>
<p>所有的API请求必须包含一个有效的 <code>User-Agent</code> 头。请求头不包含<code>User-Agent</code>的请求会被拒绝。</p>
<h4>Conditional requests</h4>
<p>大多数响应都会返回一个 <code>ETag</code> 头。很多响应也会返回一个 <code>Last-Modified</code> 头。你可以使用这些头信息对这些资源进行后续请求,分别使用 <code>If-None-Match</code> 和 <code>If-Modified-Since</code>头。如果资源没有发生改变,服务器端会返回 <code>304 Not Modified</code>。</p>
<h2>Enchant REST API 实践经验</h2>
<h4>Requests</h4>
<p><strong>Limited HTTP Clients</strong></p>
<p>如果你使用的HTTP客户端不支持PUT、PATCH、DELETE方法,发送一个POST请求,头信息里包含X-HTTP-Method-Override字段,它的值是实际需要的动词。</p>
<pre><code>$ curl -u email:password https://site.enchant.com/api/v1/users/543abc \
-X POST \
-H "X-HTTP-Method-Override: DELETE"
</code></pre>
<h4>Rate Limiting</h4>
<p>所有响应的头部包含描述当前限流状态的字段:</p>
<pre><code>Rate-Limit-Limit: 100
Rate-Limit-Remaining: 99
Rate-Limit-Used: 1
Rate-Limit-Reset: 20
</code></pre>
<ul>
<li><p><code>Rate-Limit-Limit</code> - 当前时间段内允许的总的请求数</p></li>
<li><p><code>Rate-Limit-Remaining</code> - 当前时间段内还剩余的请求数</p></li>
<li><p><code>Rate-Limit-Used</code> - 本次所使用的请求数</p></li>
<li><p><code>Rate-Limit-Reset</code> - 重置所需秒数</p></li>
</ul>
<p>如果速率限制被打破,API会返回 <code>429 Too Many Requests</code> 的状态码。在这种情况下,你的应用不应该再发送任何请求直到 <code>Rate-Limit-Reset</code> 所规定的时间过去。</p>
<h4>Field Filtering</h4>
<p>你可以自己限制响应返回的域。只需要你传递一个 <code>fields</code> 参数,用逗号分隔所需要的域,比如:</p>
<pre><code>GET /api/v1/users?fields=id,first_name
</code></pre>
<h4>Counting</h4>
<p>所有返回一个集合的URL,都会提供count统计所有结果的个数。要获取count值需要加一个 <code>count=true</code> 的参数。count会在消息头中的<code>Total-Count</code> 字段中返回。</p>
<p><code>GET /api/v1/tickets?count=true</code></p>
<pre><code>200 OK
Total-Count: 135
Rate-Limit-Limit: 100
Rate-Limit-Remaining: 98
Rate-Limit-Used: 2
Rate-Limit-Reset: 20
Content-Type: application/json</code></pre>
<hr>
<pre><code>[
... results ...
]
</code></pre>
<p>count表示所有现存结果的数量,而不是此次响应返回的结果的数量。</p>
<h4>Enveloping</h4>
<p>如果你的HTTP客户端难以读取状态码和头信息,我们可以将所有都打包进响应消息体中。我们只需要传递参数 <code>envelope=true</code>,而API会始终返回200的HTTP状态码。真正的状态码、头信息和响应都在消息体中。</p>
<pre><code>GET /api/v1/users/does-not-exist?envelope=true</code></pre>
<hr>
<pre><code>200 OK</code></pre>
<hr>
<pre><code>{
"status": 404,
"headers": {
"Rate-Limit-Limit": 100,
"Rate-Limit-Remaining": 50,
"Rate-Limit-Used": 0,
"Rate-Limit-Reset": 25
},
"response": {
"message": "Not Found"
}
}
</code></pre>
<p><strong>其他如 分页、排序等,enchant的设计规范和GitHub v3大致相同,不在赘述。</strong></p>
<h2>原文链接</h2>
<p><a href="https://segmentfault.com/a/1190000007313505">https://segmentfault.com/a/11...</a></p>
建造者(Builder)模式 的若干使用场景
https://segmentfault.com/a/1190000005368013
2016-05-28T16:22:08+08:00
2016-05-28T16:22:08+08:00
扑火的蛾
https://segmentfault.com/u/butterflyofzhang
0
<h2>1.场景一</h2>
<p>如果我们需要将一个复杂对象的构建与它的表示分离,使得同样的构建过程可以创建不同的表示的意图时,我们可以使用 Builder模式,又叫生成器模式。如果我们用了Builder模式,那么用户就只需要指定需要建造的类型就可以得到它们,而具体建造的过程和细节就不需要知道了。<br>比如现在我们有一个这样的使用场景,需要在屏幕上画小人,人要有头手脚,要画不同的人,胖的小人,瘦的小人,矮的小人。按照通常的写法,会有很多的样板代码,画人的头,画人脚手,如果一不小心,非常容易缺胳膊少腿。<br>下面我们演示Builder模式的实现:</p>
<pre><code>public class Person {
//限于篇幅get和set方法此处省略
Head head;
Body body;
Arm leftArm;
Arm rightArm;
Leg leftLeg;
Leg rightLeg;
public void drawHead(int size){...}
public void drawBody(int size){...}
public void drawLeftArm(int size){...}
public void drawRightArm(int size){...}
public void drawLeftLeg(int size){...}
public void drawRightLeg(int size){...}
}
abstract class BuilderPerson {
protected Person person = new Person();
public abstract void buildHead();
public abstract void buildBody();
public abstract void buildLeftArm();
public abstract void buildRightArm();
public abstract void buildLeftLeg();
public abstract void buildRightLeg();
}
public class BuilderThinPerson extends BuilderPerson{
@Override
public void buildHead() {
person.drawHead(10);
}
@Override
public void buildBody() {
person.drawBody(10); //画胖小人只需将这边的数值修改,
// 再生成一个类即可
}
@Override
public void buildLeftArm() {
person.drawLeftArm(5);
}
@Override
public void buildRightArm() {
person.drawRightArm(5);
}
@Override
public void buildLeftLeg() {
person.drawLeftLeg(7);
}
@Override
public void buildRightLeg() {
person.drawRightLeg(7);
}
}
我们还缺Builder模式中一个非常重要的类,指挥者(Director),用它来控制建造过程,也用来隔离用户与建造过程的关联。
public class PersonDirector{
private BuilderPerson pb;
public PersonDirector(BuilderPerson pb){
this.pb = pb;
}
//建造的过程在指挥者这里完成,用户就不需要知道了
public void createPerson() {
pb.buildHead();
pb.buildBody();
pb.buildLeftArm();
pb.buildRightArm();
pb.buildLeftLeg();
pb.buildRightLeg();
}
}
客户端代码
BuilderPerson bp = new BuilderThinPerson();
PersonDirector pd = new PersonDirector(bp);
pd.createPerson();
</code></pre>
<h2>2.场景二</h2>
<p>遇到多个构造器参数时要考虑用构建器。静态工厂和构造器有个共同的局限性:它们都不能很好地扩展到大量的可选参数。<br>考虑这样的一个场景:用一个类表示包装食品外面显示的营养成分标签。这些标签中有几个域是必需的:每份的含量、每罐的含量以及每份的卡路里,还有超过20个可选域:总脂肪量、饱和脂肪量、转化脂肪、胆固醇、钠等等。<br>程序员一向习惯采用重叠构造器模式,在这种模式下,你提供第一个只有必要参数的构造器,第二个构造器有一个可选参数,第三个有两个可选参数,以此类推,最后一个构造器包含所有可选参数。<strong>重叠构造器模式可行,但是当有许多参数的时候,客户端代码会很难编写,并且仍然难以阅读。一长串类型相同的参数会导致一些微妙的错误。如果客户端不小心颠倒了其中两个参数的顺序,编译器也不会出错,但是程序在运行时会出现错误的行为。</strong><br>下面是Builder模式的代码:</p>
<pre><code>public class NutritionFacts {
private final int servingSize;
private final int servings;
private final int calories;
private final int fat;
private final int sodium;
private final int carbohydrate;
public static class Builder {
//required parameters
private final int servingSize;
private final int servings;
//Optional parameters - initialized to default values
private int calories;
private int fat;
private int sodium;
private int carbohydrate;
public Builder(int servingSize, int servings){
this.servingSize = servingSize;
this.servings = servings
}
public Builder calories(int val){
calories = val;
return this;
}
public Builder fat(int val){
fat = val;
return this;
}
public Builder sodium(int val) {
sodium = val;
return this;
}
public Builder carbohydrate(int val){
carbohydrate = val;
return this;
}
public NutritionFacts build() {
return new NutritionFacts(this);
}
}
private NutritionFacts(Builder builder){
servingSize = builder.servingSize;
servings = builder.servings;
calories = builder.calories;
fat = builder.fat;
sodium = builder.sodium;
carbohydrate = builder.carbohydrate;
}
}</code></pre>
<p>NutritionFacts是不可变的。builder的setter方法返回builder本身,以便可以把调用链接起来。下面就是客户端代码:</p>
<pre><code>NutritionFacts juice = new NutritionFacts.Builder(240, 8).
calories(100).sodium(35).carbohydrate(27).build();</code></pre>
<p>这样的客户端代码很容易编写,更重要的是,易于阅读。</p>
<p>参考: <br>1.《大话设计模式》<br>2.《Effective Java 第二版》</p>