Leo Shang 的 BLOG

随笔


  • 首页

  • 归档

  • 标签

当我们在谈论配置终端,我们在谈些什么

发表于 2016-10-20   |  

程序员经常使用终端,甚至只在终端下工作,那么终端的顺手程度就很重要了。不可避免的,大多数程序员都掌握了配置终端的技巧,但从目前我看到的文章来看,没有比较系统的总结。本文试图通过梳理终端相关的背景知识,整理一个清晰的配置脉络。

我们配置的是终端,还是shell?

通常我们说配置终端的时候,其实隐含的意思是配置shell解释器和终端(terminal),以及常用软件(ls,grep)。为什么这三者都要配置呢?什么配置是属于shell解释器的,什么是由终端软件管理的,什么是软件的配置呢?

想要弄明白这些问题,就需要我们知道他们之间的关系。

Stdstreams-notitle

shell解释器是terminal进程的一个子进程,shell解释器通过标准输入和标准输出与terminal设备文件交互的,在shell下运行的软件,是shell的子进程,也是通过标准输入输出和terminal设备文件进行交互。值得一提,在类UNIX下,虚拟终端也是一个设备,设备文件形如/dev/ttys。

明白这个关系,我们就知道配置的边界了。比如我想修改一下显示的字体,很显然,shell的标准输出不可能输出字体格式的,我们需要在terminal的配置里找到字体配置需要的字体。

需要注意的是,shell解释器并不特指bash,一台类UNIX操作系统,往往可以支持多种shell解释器。通过cat /etc/shells 可以查看本机支持的shell解释器。另外,不同shell解释器的配置文件也不一样,以bash为例,~/.bash_profile ~/.bashrc是它的配置文件,而zsh则是~/.zsh_profile ~/.zshrc

颜色是怎么显示出来的?

毫无疑问,配置终端的重点在于颜色搭配和语法高亮。既然终端和进程交互是标准输入输出,那标准输出可不可以输出颜色属性呢?

答案是:ANSI转义序列(escape code) 。形如:ESC+[(一般显示为 ^[[)的字符序列可实现在屏幕上定位光标、改变输出字符颜色等功能。例如在bash中, ESC 字符可以用一下三种转义形式输出:

  • \e
  • \033
  • \x1B

我们可以用”[Color Codem” 的形式输出颜色,例如:

1
printf "Default \e[91mLight red"

终端的显示为:

terminal color

同理,所谓的终端提示符颜色,以及各种软件结果的输出,只要加上颜色转义,就会被终端显示为对应的颜色。理论上,我们任何的输出都可以自定义颜色,例如下面的方法,将make日志的不同级别分颜色打印出来。

1
make 2>&1 | sed -e 's/.*\bWARN.*/\x1b[7m&\x1b[0m/i' -e 's/.*\bERR.*/\x1b[93;41m&\x1b[0m/i'

了解到这些,就明白为什么 ls 指令为什么默认没有颜色,但 ls -G 就有颜色了,输出加入颜色转义了嘛。

我们需要配置终端的什么?

下面我列了下,一般我们需要做的配置清单

  • 字体字号

    • 字体:推荐苹果的两代字体 Monaco和Menlo,Monaco第一眼看起来真是惊艳,但普遍反映Menlo更加耐看一点
    • 字号:视个人情况而定,个人偏向15号
    • 在终端配置文件中修改,以osx为例,终端>偏好设置>描述文件>文本>字体
  • 颜色

    • 终端背景颜色或图片

      • 和修改字体一样,在终端配置文件中修改,以osx为例,终端>偏好设置>描述文件>文本>字体
    • 终端颜色支持

      • 这点确实要特别注意,我们使用的终端实际上是伪终端或者叫模拟终端,所以这个软件是可以模拟多种上古终端的,但是不改声明的话,他默认模拟的终端一般都是支持8*2种颜色,那我们其他配置再眼花缭乱也没有,这个终端只能显示16种颜色

        term_config

        所以这个地方一定要声明为256color的!!!

  • prompt提示符

    • prompt提示符是shell解释器的输出,所以需要修改shell解释器配置文件

    • 以bash为例:在~/.bash_profile 中修改PS1环境变量即可。例如:

    • 1
      export PS1="\[\033[36m\]\u\[\033[m\]@\[\033[32m\]\h:\[\033[33;1m\]\w\[\033[m\]\$ "
    • 更详细定制的配置方法网上很多,参见https://linuxconfig.org/bash-prompt-basics

    • 推荐一个可拖动的网站,http://bashrcgenerator.com/

  • editor(vim,emac)

    • 同理,编辑器的输出设置要在编辑器的配置文件中设置,~/.vimrc中 配置

      1
      colorscheme theme
    • vim 自带多种主题可供选择,路径为 /usr/share/vim/vim7x/colors

    • 推荐一个vim的主题网站,http://vimcolors.com/
      主题下载后放入~/.vim/colors中,修改vimrc配置即可使用

  • 常用软件(ls, grep, tmux)

    • 像ls这种常用软件,颜色格式的输出是有必要的,但是每个软件的配置不太一样,ls需要配置LSCOLORS环境变量,而grep的环境变量是GREP_COLOR
  • 别名

    • 这个根据个人习惯,比较通用的比如

      1
      alias ll="ls -hlF"

但是我只想选择

说了这么多,可能对于很多人来说(比如前段开发),并不需要了解这些,只想用的舒服就好了,有没有简单的方式只要点几下就可以做出一个很完善的终端呢?当然可以…

  • 终端

    term_file

    以osx默认终端为例,我们可以看到其实所有配置都在一个叫“描述文件”的选项中,并且,我们可以看到下边的设置,描述文件是可以导出的,那么毫无疑问,github上这些描述文件模板会非常多,总有一款适合你。

    详情参见https://github.com/lysyi3m/osx-terminal-themes

    见到喜欢的主题,下载后双击或者导入都可以。

    • iterm2

      如果你想体验更多的配置选项,已经更多拓展功能,可以尝试iterm2,个人觉得分屏功能很不错,用了以后会觉得tmux有点鸡肋了。

  • 选择zsh

    选择zsh并不是因为zsh从零开始配置比bash容易很多,而是因为zsh的插件系统,导致网上有很多可供选择的模板,让我们轻松定制。比如

    • oh-my-zsh, https://github.com/robbyrussell/oh-my-zsh 只要运行一行安装指令,什么语法高亮,自动补全,不同软件甚至git的配置统统都有了,非常强大。
    • Prezto,https://github.com/sorin-ionescu/prezto 如果你觉得oh-my-zsh功能太多,并且有点卡或者有点慢了,可以试试Prezto,这个可以认为是一个速度更快的oh-my-zsh,但是缺陷是插件没有oh-my-zsh多。

    ​

python2 json的大坑

发表于 2016-06-15   |  

介绍一下背景

最近项目中有一个接口,是通过redis队列做的。我将对方需要的数据通过json 字符串的形式,push到redis list队列中,对方监听并消费(题外话, 我对这种形式的交互有点看法吧,双方既然是接口,但是很难保证格式的统一,比使用rpc框架强验证风险大的多)。

由于对端也是用python做的消费者,所以也是相安无事。随着一个需求的变更,我在自己调试pop的数据发现,我写的json字符串是,酱事儿的:

1
{"title": "\\u6211\\u7231\\u5317\\u4eac\\u5929\\u5b89\\u95e8"}

当时我就懵逼了,这是什么鬼…
很显然这个是unicode字符串嘛,但是我明明就编码成了UTF8啦,怎么最后是这个鬼样子,更奇怪的是对方能正确解码吗?这明明是四不像啊。

我试着自己重现了整个过程。
首先,我将utf8形式的字符串 我爱北京天安门 dumps成json:

1
2
In [10]: json.dumps({"title":"我爱北京天安门"})
Out[10]: '{"title": "\\u6211\\u7231\\u5317\\u4eac\\u5929\\u5b89\\u95e8"}'

果然,确实变成了这个样子,看来是json库搞得鬼。
先不管他,看看这个结果load出来什么样子:

1
2
In [9]: json.loads('{"title": "\\u6211\\u7231\\u5317\\u4eac\\u5929\\u5b89\\u95e8"}')
Out[9]: {u'title': u'\u6211\u7231\u5317\u4eac\u5929\u5b89\u95e8'}

确实load没问题,但是可以看到,最后的结果和我当时dumps的出入蛮大的,由原来的utf8 str形式,变为了unicode形式,就连字典的key也都是unicode了。

好吧,所以现在就有了两个问题。

  • 为什么utf8字符串, json dumps后不是原来形式?
  • 为什么loads回来的数据全是unicode形式?

为什么utf8字符串, json dumps后不是原来形式?

看下官方文档:
python encode
json.dumps方法做的就是将python数据格式按照上图的映射方式转换为json格式。Python str和unicode都可以转换成json 的string形式,我们知道str和unicode差别很大啊,如果一个python字典中,同时有str和unicode的时候,json dump怎么处理呢?试一下:

1
2
In [12]: json.dumps({"title_str":"我爱北京天安门", "title_unicode":u"我爱北京天安门"})
Out[12]: '{"title_unicode": "\\u6211\\u7231\\u5317\\u4eac\\u5929\\u5b89\\u95e8", "title_str": "\\u6211\\u7231\\u5317\\u4eac\\u5929\\u5b89\\u95e8"}'

没有异常,并且都是最后按照unicode的方式统一处理的。看来python是先将str decode为unicode,然后再用unicode进行编码的。

这样本来无可厚非,自己统一好编码格式就行了,loads的时候按照编码的方式,反过来解码。但是问题是,和我们进行交互的人未必也用的python啊,当他用其他的语言对json解码的时候,还原回来就是一堆乱码了,我们能不能让json库,确实编码成utf8形式呢?

官方文档如是说:

If ensure_ascii is True (the default), all non-ASCII characters in the output are escaped with \uXXXX sequences, and the results are str instances consisting of ASCII characters only. If ensure_ascii is False, a result may be a unicode instance. This usually happens if the input contains unicode strings or the encoding parameter is used.

看来是 ensure_ascii 参数为 True 的时候,确保了所有非ASCII字符都转义成 \uXXXX 的ASCII序列。
如果我们设置为False,就可以还原本来面目了吗?试试:

1
2
In [14]: json.dumps({"title_str":"我爱北京天安门"}, ensure_ascii=False)
Out[14]: '{"title_str": "\xe6\x88\x91\xe7\x88\xb1\xe5\x8c\x97\xe4\xba\xac\xe5\xa4\xa9\xe5\xae\x89\xe9\x97\xa8"}'

果然哦,我们干脆看看python json库源码怎么实现的吧, 主要就是下列这个判断

1
2
3
4
200 if self.ensure_ascii:
201 return encode_basestring_ascii(o) # 先将字符串根据encoding参数的编码统一转化为unicode,然后连接字符串
202 else:
203 return encode_basestring(o) # 直接连接字符串

既然ensure_ascii = False时, 没有做类型的转换,所以我们原来是什么,编码后就是什么。但这带来了以下副作用:

  • 如果我们要转换的python数据类型,如果既包含str又包含unicode,在连接字符串的时候肯定会抛出编码异常

    1
    2
    In [13]: json.dumps({"title_str":"我爱北京天安门", "title_unicode":u"我爱北京天安门"}, ensure_ascii=False)
    "UnicodeDecodeError: 'ascii' codec can't decode byte 0xe6 in position 1: ordinal not in range(128)"
  • 如果全部都是unicode进行字符串连接,返回值也是unicode

    1
    2
    In [3]: json.dumps({"title_str":u"我爱北京天安门", "title":u"我爱世界"}, ensure_ascii=False)
    Out[3]: u'{"title": "\u6211\u7231\u4e16\u754c", "title_str": "\u6211\u7231\u5317\u4eac\u5929\u5b89\u95e8"}'
  • 如果全部都是str进行字符串连接,返回值也是str

    1
    2
    In [2]: json.dumps({"title_str":"我爱北京天安门", "title":"我爱世界"}, ensure_ascii=False)
    Out[2]: '{"title": "\xe6\x88\x91\xe7\x88\xb1\xe4\xb8\x96\xe7\x95\x8c", "title_str": "\xe6\x88\x91\xe7\x88\xb1\xe5\x8c\x97\xe4\xba\xac\xe5\xa4\xa9\xe5\xae\x89\xe9\x97\xa8"}'

为了能将json字符串通用的和其他语言交换,我们不得不保证,原始python数据类型必须是统一的。要么全是UTF8的str类型,要么全部是unicode,最后在encode为utf8, 否则就会有异常 这个也是动态类型要付出的代价吧。

为什么loads回来的数据全是unicode形式?

看下官方文档:
python decode
与dumps相反, json.loads 方法做的就是将json数据格式按照上图的映射方式转换为python类型。我们可以看json string 转换回来只有一种格式,那就是unicode,这样就能解释我们看到的现象了,就连dict key都是unicode的。

好麻烦啊,怎么根本的解决这个问题呢?

答: 使用python3

tornado的网络模型

发表于 2015-07-19   |  

在网站逐步发展的过程中,很可能就会遇到C10K问题,python中一个比较流行的解决方式是通过tornado这个web server解决。
tornado是一个非阻塞的Web服务器,下面我会结合源码,对tornado进行一下结构来说明。

基础理论

在这之前,我们需要先弄清楚几个概念,便于我们理解tornado。

  • 同步、异步、阻塞和非阻塞这几个概念的区别和联系。这个问题网上内容很多,可以看下知乎这个问题的讨论,个人认为还是不错的,几个高票的回答从不同角度说明了问题,看过后应该会有比较清晰的认识。
  • epoll的原理。这个是tornado的核心,在网上前人说了很多。我推荐这个[知乎讨论](http://www.zhihu.com/question/20122137),@蓝形参和@张亚伟的回答结合看,应该就能理解大概的原理

核心模块

tornado的核心模块分为三部分:

  • httpserver - 服务于 web 模块的一个非常简单的 HTTP 服务器的实现
  • iostream - 对非阻塞式的 socket 的简单封装,以方便常用读写操作
  • ioloop - 核心的 I/O 循环

我们结合tornado官网的hello world,看看整个过程到底是怎么进行的。
hello_world.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import tornado.httpserver
import tornado.ioloop
import tornado.web
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world")
if __name__ == "__main__":
application = tornado.web.Application([
(r"/", MainHandler),
])
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(8888)
tornado.ioloop.IOLoop.instance().start()

当我们运行python hello_world.py,先实例化一个tornado.web.Application实例application(这个类并不是核心范畴,所以在这里暂且不做说明,可以简单理解为对数据请求进行url路由,并且生成返回数据的一个handler。在hello_world.py中,我们对根目录url的请求,返回“hello world”内容的数据)。

然后将application作为参数,实例化tornado.httpserver.HTTPServer,赋值给http_server,http_server再调用listen方法。
这个过程发生了什么,我们来看看httpserver.py。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def __init__(self, request_callback, no_keep_alive=False, io_loop=None,
xheaders=False, ssl_options=None):
"""Initializes the server with the given request callback.
If you use pre-forking/start() instead of the listen() method to
start your server, you should not pass an IOLoop instance to this
constructor. Each pre-forked child process will create its own
IOLoop instance after the forking process.
"""
self.request_callback = request_callback
self.no_keep_alive = no_keep_alive
self.io_loop = io_loop
self.xheaders = xheaders
self.ssl_options = ssl_options
self._socket = None
self._started = False
def listen(self, port, address=""):
"""Binds to the given port and starts the server in a single process.
This method is a shortcut for:
server.bind(port, address)
server.start(1)
"""
self.bind(port, address)
self.start(1)

实例化方法除了将application赋值为request_callback外,都是使用的默认值,注意这里io_loop=None。listen方法比较简单,绑定socket然后调用start方法。start方法代码比较多,只看主要逻辑。

1
2
3
4
5
if not self.io_loop:
self.io_loop = ioloop.IOLoop.instance()
self.io_loop.add_handler(self._socket.fileno(),
self._handle_events,
ioloop.IOLoop.READ)

终于出现io_loop,在实例化httpserver的时候io_loop=None,这里将ioloop.IOLoop.instance()赋值给ioloop。值得一提的是ioloop采用单实例模式,所以ioloop.IOLoop.instance()就是返回ioloop的实例。现在跳转到ioloop.py。

1
2
3
4
5
def add_handler(self, fd, handler, events):
"""Registers the given handler to receive the given events for fd."""
self._handlers[fd] = handler
self._impl.register(fd, events | self.ERROR)

可以看到add_handler方法将fd文件描述符和events事件注册到_impl上,这个_impl可以认为是epoll类(根据操作系统不同可能为kqueue),另外将handler赋值给self._handlers[fd],_handlers[fd]相当于文件描述符和回调函数对应的字典,当事件触发的时候会调用。回到httpserver.py,我们调用的是

1
2
3
4
self.io_loop.add_handler(self._socket.fileno(),
self._handle_events,
ioloop.IOLoop.READ)

文件描述符是监听端口的socket,事件READ可以认为I/O是数据读取就绪,回调函数为self._handle_events:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def _handle_events(self, fd, events):
while True:
try:
connection, address = self._socket.accept()
except socket.error, e:
if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
return
raise
if self.ssl_options is not None:
assert ssl, "Python 2.6+ and OpenSSL required for SSL"
connection = ssl.wrap_socket(
connection, server_side=True, **self.ssl_options)
try:
stream = iostream.IOStream(connection, io_loop=self.io_loop)
HTTPConnection(stream, address, self.request_callback,
self.no_keep_alive, self.xheaders)
except:
logging.error("Error in connection callback", exc_info=True)

现在我们可以梳理一下整个过程,httpserver监听一个端口,并把这个文件描述符通过ioloop注册到epoll,只要收到请求,epoll回调_handle_events方法。这个方法做了什么呢?
根据请求数据,创建了一个socket连接connection,然后将这个connection作为文件描述符实例化另一个核心模块iostream,赋值为steam,然后使用steam和self.request_callback(tornado.web.Application实例的application)作为主要参数,实例HTTPConnection。
HTTPConnection负责HTTP协议部分,它的I/O使用iostream,通过iostream read方法读取数据解析数据包,然后调用application生成返回数据,在调用iostream write方法将数据返回。这个过程的I/O事件注册就靠iostream.
我们看下实例iostream的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def __init__(self, socket, io_loop=None, max_buffer_size=104857600,
read_chunk_size=4096):
self.socket = socket
self.socket.setblocking(False)
self.io_loop = io_loop or ioloop.IOLoop.instance()
self.max_buffer_size = max_buffer_size
self.read_chunk_size = read_chunk_size
self._read_buffer = ""
self._write_buffer = ""
self._read_delimiter = None
self._read_bytes = None
self._read_callback = None
self._write_callback = None
self._close_callback = None
self._state = self.io_loop.ERROR
self.io_loop.add_handler(
self.socket.fileno(), self._handle_events, self._state)
def read_until(self, delimiter, callback):
"""Call callback when we read the given delimiter."""
assert not self._read_callback, "Already reading"
loc = self._read_buffer.find(delimiter)
if loc != -1:
self._run_callback(callback, self._consume(loc + len(delimiter)))
return
self._check_closed()
self._read_delimiter = delimiter
self._read_callback = callback
self._add_io_state(self.io_loop.READ)
def write(self, data, callback=None):
"""Write the given data to this stream.
"""
self._check_closed()
self._write_buffer += data
self._add_io_state(self.io_loop.WRITE)
self._write_callback = callback
def _add_io_state(self, state):
if not self._state & state:
self._state = self._state | state
self.io_loop.update_handler(self.socket.fileno(), self._state)

可以看到在实例iostream的时候,我们就将返回给客户端的文件描述符注册到ioloop上,但是事件是ERROR。在write和read_until方法中,都调用了_add_io_state方法,这个方法负责更新对应文件描述符的注册事件。

现在我们来看看tornado所谓的单线程主要的任务调度逻辑,ioloop中start方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def start(self):
...
while True:
...
try:
event_pairs = self._impl.poll(poll_timeout)
except Exception, e:
# Depending on python version and IOLoop implementation,
# different exception types may be thrown and there are
# two ways EINTR might be signaled:
# * e.errno == errno.EINTR
# * e.args is like (errno.EINTR, 'Interrupted system call')
if (getattr(e, 'errno') == errno.EINTR or
(isinstance(getattr(e, 'args'), tuple) and
len(e.args) == 2 and e.args[0] == errno.EINTR)):
logging.warning("Interrupted system call", exc_info=1)
continue
else:
raise
...
self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
try:
self._handlers[fd](fd, events)
except (KeyboardInterrupt, SystemExit):
raise
except (OSError, IOError), e:
if e[0] == errno.EPIPE:
# Happens when the client closes the connection
pass
else:
logging.error("Exception in I/O handler for fd %d",
fd, exc_info=True)
except:
logging.error("Exception in I/O handler for fd %d",
fd, exc_info=True)

在这里 event_pairs = self._impl.poll(poll_timeout),陷入epoll,然后while self._events之后的代码,运行触发事件后回调函数。
现在我们通过一张图看看整个流程:
tornado-httpserver图

还有什么

我们看到在tornado中,无论运行什么库,只要涉及I/O,都要注册到ioloop上,这样才能发挥异步I/O的作用,否则tornado也会阻塞。所以tornado会有很多第三方库,所以在实际使用中,我们有必要学习一下第三方库的使用。

python的协程

发表于 2015-07-06   |  

关于python的协程,网上资料还是挺多的,这里说一下我的理解吧。

什么是协程?

先看一下wiki的定义吧

Coroutines are computer program components that generalize subroutines for nonpreemptive multitasking, by allowing multiple entry points for suspending and resuming execution at certain locations.

这个定义不太容易理解。一句话可能很难直观的说明协程这个概念,通俗讲,协程是由一系列的子程序协同完成一个任务,这些子程序可以主动挂起交出控制权,当恢复执行的时候,可以从挂起的位置继续执行,而这一切的调度由用户操作,而不是操作系统。所以有人称,协程是用户态线程。
协程的实现并不与操作系统相关,是语言相关的,所以可以看到主流的一些语言都有协程的实现,包括java,go等。在python中,协程是通过生成器实现的,yeild就可以保存当前子程序上下文,并交出控制权,使用send就可以传递数据并恢复相应子程序。这样多个生成器子程序,就可以通过yield和send相互协作完成任务。

python的协程和生成器的关系

说到这里,可能会产生疑问,使用yield的函数不是生成器么?生成器就是协程么?确实如此,参见PEP 342,在python 2.5以前生成器就是仅仅是迭代器函数,可以生成无限列表。但是yield保存上下文,主动交出控制权的特性已经很接近协程了,所以在python 2.5对生成器进行了几个改进:

  • yield从语句变为表达式, 这个是为了传值方便
  • 加入send()方法用于在恢复生成器的时候,传入值
  • 加入close()方法用于结束协程
  • 加入throw()方法用于传入异常
    加入了send()和throw()方法,我们就可以在协程恢复的时候,传入值或者异常。有了这些特性,python从语言上就支持基本的协程功能了,当然对不同协程的控制,还要用户自己来编写。所以我们可以说,从python 2.5以后的生成器才可以用于作为协程。

与greenlet,gevent的关系

介绍完python在语义上对协程的支持,但实际使用中会发现,很少有用生成器方式的协程,一般用greenlets,gevent这样的package代替。为什么会这样呢?主要还是因为python2.x对协程的支持有限,要支持复杂的应用比较困难,但是在python3以后,协程会更加好用,我们看看python对协程的支持历程:

Implementations for Python

  • Python 2.5 implements better support for coroutine-like functionality, based on extended generators (PEP 342)
  • Python 3.3 improves this ability, by supporting delegating to a subgenerator (PEP 380)
  • Python 3.4 introduces a comprehensive asynchronous I/O framework as standardized in PEP 3156, which includes coroutines that leverage subgenerator delegation
  • Python 3.5 introduces explicit support for coroutines with async/await syntax (PEP 0492).

到Python 3.5都已经有明确的异步操作方法了,但是这些都是2.x所不具备的。所以在python2.x时代,就需要其他实现方式作为补充。
greenlet就是这样一个库,它是从stackless python中剥离,支持CPython的版本的一个协程模块, 相当于python协程的增强版。在github上有使用greenlet重新实现生成器的demo,大家可以体会一下greenlet的特性。但是和python 生成器协程一样,greenlet也没有控制调度的功能,如果要实现一个非阻塞的操作,还要自己实现控制调度逻辑,这就催生了gevent的产生。

gevent is a coroutine-based Python networking library that uses greenlet to provide a high-level synchronous API on top of the libev event loop.
Features include:

  • Fast event loop based on libev (epoll on Linux, kqueue on FreeBSD).
  • Lightweight execution units based on greenlet.
  • API that re-uses concepts from the Python standard library (for example there are Events and Queues).
  • Cooperative sockets with SSL support »
  • DNS queries performed through threadpool or c-ares.
  • Monkey patching utility to get 3rd party modules to become cooperative »

gevent在greenlet基础上结合libev作为事件循环,补充了协程要自己写异步调度的空缺,最大化了协程的性能。在此基础上,还提供了多种API,方便开发,甚至直接提供了一个支持协程WSGI server,bottle就支持了这个特性。另外值得一提的是它的Monkey patch,可以无缝将python标准库中阻塞的API包装成非阻塞的,这一特性大大提高了gevent的应用率。

协程带来的改变

面向对象是和现实世界构成的形式一致的,但是不同对象之间的交互,还是采用调用的关系。调用关系隐含的的是主从关系,但现实世界,很多关系的协作是对等的,比如生产者和消费者。协程就是在计算机程序设计中对这种现实反映的实现。
下面我们结合程序简要说明协程的几种应用:

  • 无限列表
    假设一种情形,我们需要所有的斐波那契数列,如果不用协程基本上实现不了吧。协程实现就很方便:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    def fib():
    first, second = 0, 1
    yield first
    yield second
    while True:
    third = first + second
    yield third
    first = second
    second = third
  • 管道
    如果我们想用python实现管道怎么做呢,答案也是协程。下面这个例子来自于python参考手册,打印指定目录下,满足格式文件中,有“python”关键字的行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    import os
    import fnmatch
    import gzip, bz2
    def coroutine(func):
    """自动调用协程的next()函数"""
    def start(*args, **kwargs):
    g = func(*args, **kwargs)
    g.next()
    return g
    return start
    @coroutine
    def find_files(target):
    while True:
    topdir, pattern = (yield)
    for path, dirname, filelist in os.walk(topdir):
    for name in filelist:
    if fnmatch.fnmatch(name, pattern):
    target.send(os.path.join(path, name))
    @coroutine
    def opener(target):
    while True:
    name = (yield)
    if name.endswith('.gz'): f = gzip.open(name)
    elif name.endswith('.bz2'): f = bz2.BZ2File(name)
    else: f = open(name)
    target.send(f)
    @coroutine
    def cat(target):
    while True:
    f = (yield)
    for line in f:
    target.send(line)
    @coroutine
    def grep(pattern, target):
    while True:
    line = (yield)
    if pattern in line:
    target.send(line)
    @coroutine
    def printer():
    while True:
    line = (yield)
    sys.stdout.write(line)
    finder = find_files(opener(cat(grep('python', printer))))
    finder.send('www', 'access-log*')
    finder.send('otherwww', 'access-log*')
  • 并发
    关于并发,最好的例子应该是gevent吧,大家有兴趣可以看下源码。基本的原理是,将函数变为协程,每触发I/O阻塞就yield交出控制权,并将事件注册到epoll,当I/O就绪就是用send方法,传入I/O数据,并恢复逻辑。这么描述其实和tornado、nodejs的网络模型很像,但是协程对于程序员更加友好。tornado和nodejs默认还是通过回调函数完成这个事件循环的,这样代码并不直观,但使用协程可以用同步的方式完成回调函数的工作。

celery有什么难理解的?

发表于 2015-07-04   |  

两年前,公司所有的管理后台用的都是django admin,我们遇到一个复杂的需求,需要从django admin调用执行一个大批量处理脚本,并且可以查看执行状态。
当时不太了解celery,加上时间紧迫,就没敢冒险。简单说说我们当时是怎么做的,我们把这个脚本做成了command line的形式,django使用python的subprocess调用这个命令,为了满足可以查看状态这个需求,我们单独创建了一个任务表,在启动脚本时先插入数据,完成或者异常都更新这条数据状态,这样在django admin里就能看到执行状态了。
这个方法满足了需求,并沿用到现在,但是这里有一个显而易见的硬伤:django必须和这个脚本在同一个服务器上。好吧,毫无可拓展性。
其实这个就是一个天然celery使用场景,如果使用celery就可以轻松解决分布拓展问题。

什么是celery?

说了这么多,什么是celery呢?官网定义:

Celery is an asynchronous task queue/job queue based on distributed message passing.

celery是一个基于分布消息传递的异步任务队列。定义很简单,但是这里隐含了一个条件就是celery不是单独存在的,它一定需要建立在一个分布的消息传递机制上,这个消息传递机制就是celery文档里常说的broker。我认为这是第一个难以理解的点。
一般情况下,一个工具库或者一个框架都是独立的,有自己的feature或者功能点,可能依赖其他的库,但绝不依赖于其他服务。但是celery是一个特例,如果celery没有broker这个服务,那就完全不能用了。这就是为什么现在网络上大多数celery文章都是和rabbitmq或者redis一起讲的。
清楚这点,知道了broker和celery的关系,就不会有rabbitmq不就可以做任务队列吗?为什么和celery结合?这样的疑问。事实上,在官网上就有使用rabbit作为任务队列的实现,多种语言都有。rabbitmq是消息代理中间件,具体应用到什么场景,怎么用,用什么语言,都可以自己定义,celery也实现了这个接口而已。

celery结构框架

下面贴一个celery+rabbitmq的结构图,个人认为挺能说明问题:
celery rabbitmq结构图
celery隐藏了rabbitmq接口的实现细节,既充当了publisher(client)又充当了consumer (worker)的角色。
这是第二个难理解的点,这个困扰可能来源于celery提供的get start文档,如果按照这个文档一步步走,确实能走通流程。但是这个通俗的demo有一个问题,就是他的publisher和consumer都在一台server上,并且client调用具体任务的方式是通过import。这让人很难理解,为什么说celery是分布的呢?但是奇怪的是,官方demo就没有publisher和consumer分布在两台server的例子。

client broker worker关系图
如上图所示,producer、broker、consumer之间的网络拓扑关系可以有这5种情形。

celery到底做了什么?

上文说了,rabbitmq官网就有demo做任务队列的,那要celery有何用呢?还有上边,我提到的那个需求,没用celery,虽然拓展性不好,但是改一下用一个web服务代替不就行了么?这么说没错,但都是重复造轮子。
思考一下,如果我们用rabbitmq自己实现任务队列,有一天我们不想用rabbit了怎么办?我们换个思维,如果没有celery,让你自己设计一个异步任务队列你怎么做。首先,要有一个发起任务的client,选定一定保存任务信息的媒介,由一个worker去一直监听这个信息媒介,这个worker最好是多进程的,另外可以兼容尽可能多得信息媒介。好吧,这个不就是celery所做的事儿么,celery兼容多个broker,既是任务发起者又是执行者,另外支持多进程…还有好多通用功能考虑。
看一下这个图体会一下celery+rabbitmq的整个工作流程:

Celery_RabitMQ_Diagram

我是怎么用的?

说了这么多,来看看我是怎么用的。

  1. 选择broker和result backend
    知道了原理,这个选择要容易些,对于消息中转,没有比rabbitmq更灵活健壮的了,至于对result backend的存储,仁者见仁。rabbitmq安装配置参考

  2. 提取celery配置文件
    把共有配置文件提出来,进行维护,比分别维护producer和consumer的配置统一容易的多。
    celeryconfig.py

    1
    2
    3
    4
    5
    BROKER_URL = 'amqp://test01:password@192.10.2.156//'
    CELERY_RESULT_BACKEND = 'amqp://test01:password@192.10.2.156//'
    CELERY_ACCEPT_CONTENT = ['json']
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_RESULT_SERIALIZER = 'json'
  3. consumer(server)端开发

    • 在项目目录下创建app.py文件

      1
      2
      3
      4
      5
      from celery import Celery
      app = Celery('tqlib', include=['tq.tasks'])
      app.config_from_object('celeryconfig')
      if __name__ == '__main__':
      app.start()
    • 创建tasks.py

      1
      2
      3
      4
      from tq.app import app
      @app.task
      def test(no):
      print no
    • 启动多进程服务
      项目目录结构为

      tq----app.py
        |---tasks.py
        |---celeryconfig.py
      

      启动指令为:

      1
      celery multi restart ccworker --app=tq.app -l info
  4. producer(client)端开发

    1
    2
    3
    4
    from celery import Celery
    celery = Celery()
    celery.config_from_object('celeryconfig')
    celery.send_task('tq.tasks.test', ("hello world",))

    注意,就是celery.send_task()这个方法解决了producer和consumer的网路拓扑传递数据问题。

sqlalchemy如何分表

发表于 2015-06-29   |  

背景

话说sqlalchemy真是一个非常好用的库,python orm基本上是舍我其谁了,文档还非常全面,基本上没有什么硬伤,现在也冲出了1.0版本,未来更加值得期待。
我最早用django orm,不过很快就觉得很多功能不够用,我当时用的版本是1.3.1,没有bulk insert也没有锁,没有这两个功能,好多应用就没法用django开发了。之后开始接触sqlalchemy,一直用到现在,总的体会是只有你想不到没有它做不到。

我们项目里有一个需求,就是数据按月分表,比如:2014年6月数据就存在record_201406表中, 其他月数据按此方法类推。这个需求如果是用sqlalchemy来获取数据,我们怎么做呢?

一般方法有什么问题?

一般情况下,我们很自然想到使用如下方法:

1
2
3
4
5
class RecodeDao_201406(Base):
__tablename__ = 'record_201406'
id = Column(INT(11), primary_key=True)
...

或者简化点:

1
2
3
4
class RecodeDao_201406(Base):
__table__ = Table('record_201406',
Base.metadata, autoload=True)

这样实现确实没问题,但回到需求上,既然是按月分表,难不成我要每个月写一个这样的model?每月上次线?当然不行,那我们怎么解决呢?

官网解决方法,有什么问题?

有经验的同学可能发现,这个不就是水平sharding么?这么说不完全对,看一下sharding的wiki定义:

A database shard is a horizontal partition of data in a database or search engine. Each individual partition is referred to as a shard or database shard. Each shard is held on a separate database server instance, to spread load.

我们这个需求只涉及单数据库,就不算sharding了,可以称为partitioning(分区),然而强大的sqlalchemy这两个情况都考虑到了,并且官网都提供了example,我们挑对应场景的partitioning出来看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class TBase(object):
"""Base class is a 'mixin'.
Guidelines for declarative mixins is at:
http://www.sqlalchemy.org/docs/orm/extensions/declarative.html#mixin-classes
"""
id = Column(Integer, primary_key=True)
data = Column(String(50))
def __repr__(self):
return "%s(data=%r)" % (
self.__class__.__name__, self.data
)
class T1Foo(TBase, Base):
__tablename__ = 't1'
class T2Foo(TBase, Base):
__tablename__ = 't2'
timestamp = Column(DateTime, default=func.now())
engine = create_engine('sqlite://', echo=True)
Base.metadata.create_all(engine)
sess = sessionmaker(engine)()
sess.add_all([T1Foo(data='t1'), T1Foo(data='t2'), T2Foo(data='t3'),
T1Foo(data='t4')])
print sess.query(T1Foo).all()
print sess.query(T2Foo).all()

使用了继承的方法,抽象的好,但我们之前的问题解决了吗?没有。还是需要预定义好所有表的model类,才能正确使用,迫不得已,我们只能自己想办法了。

函数方法解决

经过一番探索,我得出了如下方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class_registry = {}
DbBase = declarative_base(bind=engine, class_registry=class_registry)
def get_model(modelname, tablename, metadata=DbBase.metadata):
"""
args:
modelname:新model名,string类型
tablename:数据库中表名
usage:
RecordDao = get_model("RecordDao_201406", "record_201406")
"""
if modelname not in class_registry:
model = type(modelname, (DbBase,), dict(
__table__ = Table(tablename, metadata, autoload=True)
))
else:
model = class_registry[modelname]
return model

每次想获取对应月表数据的model,调用get_model方法即可。这个方法一直沿用到现在,虽然有点丑陋,但却是解决了以上问题。直到sqlalchemy 0.9.1版本推出Automap

Automap方法

sqlalchemy文档完备,具体可点击Automap,它可以自动映射数据库的表,通过数据表名映射model,简单直接,实现起来如下:

1
2
3
4
5
6
7
8
from sqlalchemy.ext.automap import automap_base
AutoBase = automap_base()
# reflect the tables
AutoBase.prepare(engine, reflect=True)
tablename = "record_201406"
RecordDao = getattr(AutoBase.classes, tablename)

这样就可以了,很清晰。但是这个方法有一个缺点,Automap的映射虽然是自动的,但是只有在启动的时候生效,也就是说如果新建一个数据表,而没有告诉Automap,那这个表是找不到的。在实际使用中,可以捕获AttributeError异常,并再次调用AutoBase.prepare(engine, reflect=True) 刷新映射关系。

我为什么写博客?

发表于 2015-06-17   |  

我以前也有写技术博客的习惯,在csdn上,解决一个问题写一篇,有些工具的使用方式也放上边,比较有规律。

后来换了工作,开始比较忙,就扔下了,等闲下来想写,又觉得不好意思了。那时候眼界宽了,看过很多大牛写的文章,觉得自己写的这么浅显,有点拿不出手,想厚积薄发再写。然后就到了现在,对于写博客来说又有了新认识,所以又准备重新捡起这个习惯。

首先,自己确实有刚需。有的技术问题虽然解决了,但是下次遇到了,细节问题又要重新检索,时间成本高,最好还是找个地方记录。另外,在检索信息的过程中,正是很多博客的内容给了我很多指引,解决了问题,写了博客也可以帮助他人,也算是回馈技术圈的一种方式。写博客还有一种好处就是加深认识,提高表述能力。平时对很多问题确实是有了新的理解,但让我写出来,讲给他人,那还得再深思熟虑一番,这个过程对博主是很有帮助的。这段时间深有体会,才想起来还是要再写博客,至于写得是不是有独到见解,是不是拿得出手,其实也没那么重要,都是自己成长的过程。

本篇作为新博客第一篇,既为了开篇,也为了纪念。至于为什么用了githup page + hexo,确实是为了可定制、免费、省心。另外NexT的主题满足了我的所有需求,如果你喜欢我的博客样式,可以尝试一下。

Leo Shang

Leo Shang

永远年轻, 永远热泪盈眶

7 日志
4 标签
RSS
© 2016 Leo Shang
由 Hexo 强力驱动
主题 - NexT.Mist