SegmentFault 人生苦短,我用Python最新的文章
2016-01-22T15:59:44+08:00
https://segmentfault.com/feeds/blogs
https://creativecommons.org/licenses/by-nc-nd/4.0/
如何在 Tornado 中实现 Middleware
https://segmentfault.com/a/1190000004351569
2016-01-22T15:59:44+08:00
2016-01-22T15:59:44+08:00
风飞扬
https://segmentfault.com/u/fengfeiyang
1
<h2>定义 Middleware</h2>
<pre><code class="python"> class MiddleWare(object):
def process_request(self, request):
#request是一个RequestHandler的实例
pass
def process_response(self, request):
pass
</code></pre>
<h2>实现登录检查 Middleware</h2>
<pre><code class="python">
class AuthorizationException(Exception):
def __init__(self, msg):
super(AuthorizationException, self).__init__(403, msg)
class CheckLogin(MiddleWare):
def is_login(self, requesthandler):
return True
def process_request(self, request):
if must_login(request):
if not self.is_login(request):
raise AuthenticationException(403, "login required")
def is_login(self, request):
return request.get_session() != None</code></pre>
<h2>找地方注册 Middleware</h2>
<ol><li><p>可以在 Tornado 初始化的时候将Middleware注册到 Application 中</p></li></ol>
<h2>调用 Middlware</h2>
<pre><code class="python">class MiddleRequestHandler(RequestHandler):
def prepare(self):
for middleware in self.app.middleware:
middleware.process_request(self)
def finish(self):
for middleware in self.app.middleware:
middleware.process_response(self)
super(MiddleRequestHandler, self).finish()</code></pre>
<h2>可以运行的例子</h2>
<ol>
<li><p>实现登录检查</p></li>
<li><p>在线统计</p></li>
</ol>
<p><a href="https://gist.github.com/alex8224/7e4ca9a27727cf2a86b6">https://gist.github.com/alex8224/7e4ca9a27727cf2a86b6</a></p>
将任意Bytecode注入运行中的Python进程
https://segmentfault.com/a/1190000002783940
2015-05-21T16:11:40+08:00
2015-05-21T16:11:40+08:00
风飞扬
https://segmentfault.com/u/fengfeiyang
4
<p>在调试 Python 程序的时候,一般我们只能通过以下几种方式进行调试:</p>
<ol>
<li>程序中已经有的日志</li>
<li>在代码中插入 import pdb; pdb.set_trace()</li>
</ol>
<p>但是以上的方法也有不方便的地方, 比如对于已经在运行中的程序, 就不可能停止程序后加入 调试代码和增加新的日志.</p>
<p>从 JAVA 的 <strong>BTrace(<a rel="nofollow" href="https://kenai.com/projects/btrace">https://kenai.com/projects/btrace</a>)</strong> 项目得到灵感,尝试对正在运行的 <strong>Python</strong> 进程插入代码,在程序运行到指定的函数后,自动连接远程主机进行调试</p>
<p>首先介绍三个开源的项目, 本实验需要用到这三个项目</p>
<ol>
<li>
<strong>Pyasite</strong> <a rel="nofollow" href="https://github.com/lmacken/pyrasite">https://github.com/lmacken/pyrasite</a> Tools for injecting code into running Python processes</li>
<li>
<strong>Byteplay</strong> <a rel="nofollow" href="https://github.com/serprex/byteplay">https://github.com/serprex/byteplay</a> 一个字节码维护项目,类似 java的asm/cglib</li>
<li>
<strong>Rpdb-Shell</strong> <a rel="nofollow" href="https://github.com/alex8224/Rpdb-Shell">https://github.com/alex8224/Rpdb-Shell</a>
</li>
</ol>
<p>待注入的代码, 用官方的 <code>tornado hello demo</code> 做例子</p>
<pre><code>import tornado.ioloop
import tornado.web
import os
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world")
application = tornado.web.Application([
(r"/", MainHandler),
])
if __name__ == "__main__":
application.listen(8888)
print(os.getpid())
tornado.ioloop.IOLoop.instance().start()
</code></pre>
<p>注入以下代码(<code>testinject.py</code>)到 <strong>get</strong> 中</p>
<pre><code>import sys
import dis
import inspect
from byteplay import *
def wearedcode(fcode):
c = Code.from_code(fcode)
if c.code[1] == (LOAD_CONST, 'injected'):
return fcode
c.code[1:1] = [
(LOAD_CONST, injected'), (STORE_FAST, 'name'),
(LOAD_FAST, 'name'),
(PRINT_ITEM, None), (PRINT_NEWLINE, None),
(LOAD_CONST, -1), (LOAD_CONST, None),
(IMPORT_NAME, 'rpdb'), (STORE_FAST, 'rpdb'),
(LOAD_FAST, 'rpdb'), (LOAD_ATTR, 'trace_to_remote'),
(LOAD_CONST, '192.168.1.1'), (CALL_FUNCTION, 1),
(POP_TOP, None)
]
return c.to_code()
def trace(frame, event, arg):
if event != 'call':
return
co = frame.f_code
func_name = co.co_name
if func_name == "write":
return
if func_name == "get":
import tornado.web
args = inspect.getargvalues(frame)
if 'self' in args.locals:
if isinstance(args.locals['self'], tornado.web.RequestHandler):
getmethod = args.locals['self'].get
code = getmethod.__func__.__code__
getmethod.__func__.__code__ = wearedcode(code)
return
sys.settrace(trace)
</code></pre>
<h2>环境</h2>
<ol>
<li>ubuntu 14.04 64bit LTS</li>
<li>Python 2.7.6</li>
</ol>
<h2>步骤</h2>
<ol>
<li>在机器上安装上面需要用到的三个项目</li>
<li>python server.py</li>
<li>在 <code>192.168.1.1</code> 执行 <code>nc -l 4444</code>
</li>
<li>pyrasite $(ps aux |grep server.py |grep -v grep|awk '{print $2}') testinject.py</li>
<li>执行 curl <a rel="nofollow" href="http://localhost:8000">http://localhost:8000</a> 两次, 在第二次请求时替换的 <code>bytecode</code> 才会生效</li>
</ol>
<h2>结果</h2>
<p>在执行上面的步骤后, 在执行第二次 curl <a rel="nofollow" href="http://127.0.0.1:8000">http://127.0.0.1:8000</a> 后, 应该能够看到控制台输入 injected 的字样,并且 nc -l 4444 监听的终端会出现 <code>(pdb)></code> 的字样, 这样就能够对正在运行中的程序进行调试了.</p>
<h2>原理</h2>
<p><code>Pyasite</code> 可以注入代码到运行中的 Python 进程,它利用了 Python 的 <code>PyRun_SimpleString</code> 这个API插入代码, 至于进程注入应该是使用了 <code>ptrace</code><br><code>Byteplay</code> 是一个可以维护 Python bytecode的工具, 这部分跟 cglib/asm类似</p>
<p><code>Pyasite</code> 只能把代码注入到进程中并运行,不能定位到具体的函数并注入 bytecode, 在 <code>testinject.py</code> 中结合 Byteplay 完成了函数定位和替换 get 函数字节码的功能.</p>
<p>函数的定位用到了 sys.settrace 这个API,他提供了 <strong>call, line, return, exception</strong>事件,在合适的时机调用用户提供的函数, 具体可以参考 <a rel="nofollow" href="https://docs.python.org/2/library/sys.html#sys.settrace">https://docs.python.org/2/library/sys.html#sys.settrace</a> 的解释</p>
<p>理论上可以插入任意字节码到程序中的任意位置, 实现对现有进程中代码的任意修改.</p>
Vim窗口布局保存插件
https://segmentfault.com/a/1190000000447950
2014-03-27T18:25:39+08:00
2014-03-27T18:25:39+08:00
风飞扬
https://segmentfault.com/u/fengfeiyang
1
<p>Vim 的分屏很好用,可以开多个窗口对照代码,但是分的屏多了,需要临时扩大某个窗口到最大,编辑代码,使用完成之后,又想要恢复原先已经设置好的布局,Vim本身没有提供这样的功能,但是提供了实现这种功能的基础, 下面的代码就实现了这样的功能. 将下面的代码保存为vimlayout.vim放到 Vim的plugin目录下, 设置自己喜欢的绑定键就可以了工作了. 此代码在 Vim 7.3中测试通过.</p>
<pre><code>if exists("g:vimlayoutloaded")
finish
else
let g:vimlayoutloaded=1
endif
function! HeightToSize(height)
let currwinno=winnr()
if winheight(currwinno)>a:height
while winheight(currwinno)>a:height
execute "normal \<c-w>-"
endwhile
elseif winheight(currwinno)<a:height
while winheight(currwinno)<a:height
execute "normal \<c-w>+"
endwhile
endif
endfunction
function! WidthToSize(width)
let currwinno=winnr()
if winwidth(currwinno)>a:width
while winwidth(currwinno)>a:width
execute "normal \<c-w><"
endwhile
elseif winwidth(currwinno)<a:width
while winwidth(currwinno)<a:width
execute "normal \<c-w>>"
endwhile
endif
endfunction
function! TweakWinSize(orgisize)
call HeightToSize(a:orgisize[0])
call WidthToSize(a:orgisize[1])
endfunction
function! RestoreWinLayout()
if exists("g:layout")
let winno=1
let orgiwinno=winnr()
for win in g:layout
execute "normal \<c-w>w"
let currwinno=winnr()
if currwinno!=1 && currwinno!=orgiwinno
call TweakWinSize(g:layout[currwinno-1])
endif
endfor
unlet g:layout
endif
endfunction
function! SaveWinLayout()
let wnumber=winnr("$")
let winlist=range(wnumber)
let winno=0
let layout=[]
for index in winlist
let winno+=1
let wininfo=[winheight(winno),winwidth(winno)]
call add(layout,wininfo)
endfor
let g:layout=layout
endfunction
function! ToggleMaxWin()
if exists("g:layout")
if winnr("$")==len(g:layout)
call RestoreWinLayout()
else
call SaveWinLayout()
execute "normal 200\<c-w>>"
execute "normal \<c-w>_"
call RestoreWinLayout()
endif
else
call SaveWinLayout()
execute "normal 200\<c-w>>"
execute "normal \<c-w>_"
endif
endfunction
</code></pre>