gevent的异步方式

介绍

gevent是基于协程的高性能Python网络库,相比Twisted、Stackless等,gevent使用libev事件循环,因此速度很快、性能很好,使用greenlet提供高层的同步API,因此非常轻量。
作者曾经是另一个Python网络库Eventlet的开发者之一,2009年因 为Eventlet无法满足项目需要,他决定另外开发一个更轻量的库,这就是gevent。某种意义上,gevent是从Eventlet项目派生出来的。

异步方式

  • gevent对标准I/O函数做了猴子补丁,把它们变成了异步。
  • 并且gevent有greenlet对象能够被用于并发执行。

greenlet

greenlet是一种协程。gevent的调度器在I/O等待期间使用一个事件循环在所有的greenlets间来回切换,而不是用多个CPU来运行它们。
gevent通过使用wait函数来设法尽可能透明化地处理事件循环。wait函数将启动一个事件循环,直到所有的greenlets结束。
greenlet由gevent.spawn(func, para)来创建,func是需要执行的函数,para是func的参数。一旦声明的函数执行完成,它的值就会包含在greenlet的value域中。

猴子补丁

Monkey Patch就是在运行时对已有的代码进行修改,达到hot patch的目的。

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from gevent import monkey; monkey.patch_all()
import gevent
import requests

def f(url):
print('GET: %s' % url)
headers = {'User-Agent': 'Mozilla/5.0'}
r = requests.get(url, headers = headers)
print('%d bytes received from %s.' % (r.status_code, url))


gevent.joinall([
gevent.spawn(f, 'https://www.baidu.com'),
gevent.spawn(f, 'https://www.sina.com'),
gevent.spawn(f, 'http://www.douban.com')
])

结果:

1
2
3
4
5
6
7
(venv2) E:\github\concurrent>python gevent2.py
GET: https://www.baidu.com
GET: https://www.sina.com
GET: http://www.douban.com
200 bytes received from https://www.baidu.com.
200 bytes received from http://www.douban.com.
200 bytes received from https://www.sina.com.

其他

在部署web app的时候,用一个支持gevent的WSGI服务器,立刻就获得了数倍的性能提升。

tornado的异步方式

介绍

Tornado是Facebook主要为HTTP客户端和服务器端开发的异步I/O包。Tornado主要应对的是I/O密集型应用,Tornado可以使用协程和回调两种方式。
Tornado 优秀的大并发处理能力得益于它的 web server 从底层开始就自己实现了一整套基于 epoll 的单线程异步架构(其他 python web 框架的自带 server 基本是基于 wsgi 写的简单服务器,并没有自己实现底层结构)。

epoll

ioloop 的实现基于 epoll ,那么什么是 epoll? epoll 是Linux内核为处理大批量文件描述符而作了改进的 poll 。
那么什么又是 poll ? 首先,我们回顾一下, socket 通信时的服务端,当它接受( accept )一个连接并建立通信后( connection )就进行通信,而此时我们并不知道连接的客户端有没有信息发完。 这时候我们有两种选择:

  1. 一直在这里等着直到收发数据结束;
  2. 每隔一定时间来看看这里有没有数据;

第二种办法要比第一种好一些,多个连接可以统一在一定时间内轮流看一遍里面有没有数据要读写,看上去我们可以处理多个连接了,这个方式就是 poll / select 的解决方案。 看起来似乎解决了问题,但实际上,随着连接越来越多,轮询所花费的时间将越来越长,而服务器连接的 socket 大多不是活跃的,所以轮询所花费的大部分时间将是无用的。为了解决这个问题, epoll 被创造出来,它的概念和 poll 类似,不过每次轮询时,他只会把有数据活跃的 socket 挑出来轮询,这样在有大量连接时轮询就节省了大量时间。

tornado.ioloop

ioloop 实际上是对 epoll 的封装,并加入了一些对上层事件的处理和 server 相关的底层处理。

协程

1
2
3
4
5
6
7
8
from tornado import gen

@gen.coroutine
def fetch_coroutine(url):
http_client = AsyncHTTPClient()
response = yield http_client.fetch(url)
#为了返回值,python2中需要生成一个特殊的异常;在python3.3以后直接用return
raise gen.Return(response.body)
  • @gen.coroutine 和 yield 搭配使用,代表的是协成。
  • yield之后挂起耗时操作,耗时操作完成时从挂起的位置接着往下执行。
  • Tornado的协成是由python的generators来支持的。

回调

1
2
3
4
5
6
7
from tornado.httpclient import AsyncHTTPClient

def asynchronous_fetch(url, callback):
http_client = AsyncHTTPClient()
def handle_response(response):
callback(response.body)
http_client.fetch(url, callback=handle_response)
  • http_client第二个参数callback指定了回调函数。
  • 多重的显式的回调容易引起“回调地狱”。

刷票

给小朋友投票,拉微信朋友圈的朋友点赞什么的好麻烦啊。索性把分享的页面url拉到浏览器上研究一下,发条http post把data拿出来,直接post到目标地址不就ok了。开搞。

安装requests

1
pip install requests

主要代码

1
2
3
4
5
6
7
8
9
10
11
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import requests

get_cookie=requests.get("https://aa.rrxiu.cc/v/4uj43w?vt=1&from_code=3c2953a3680daf86bfdb76cef13d2572&guid97=d8c3e2e1aee2be5b35e571711865189f-162498&from=timeline&isappinstalled=0")
data={'wsiteGuid':'4uj43w','wxOpenId':'','nickName':'','headimgurl':'','guid':'d8c3e2e1aee2be5b35e571711865189f','id':'162498'}

for i in range(100):
r = requests.post('https://res2.rrxiu.net/pluginPhotoVote/photoVoteData/vote',cookies=get_cookie.cookies,data=data)
print(r.text)

celery介绍

Celery架构设计

Celery是一个用python编写的分布式的任务调度模块,它有着简明的 API,并且有丰富的扩展性,适合用于构建分布式的 Web 服务。celery的架构包括三个部分:消息中间件(message broker),任务执行单元(worker)和任务执行结果的存储(task result store)。

消息中间件

celery本身不提供消息服务,但可以和第三方提供的消息中间件(RabbitMQ,Redis等)集成。

任务执行单元

worker是celery提供的任务执行单元,worker并发的运行在分布式的系统节点中。

任务结果存储

task result store用来存储worker执行的任务的结果,celery支持以不同方式存储任务的结果(AMQP,Redis,MongoDB)。

另外,celery还支持不同的并发(Prefork,Eventlet,gevent,threads)和序列化的手段(pickle,json,yaml等)。

概图

celery

安装

1
2
pip install celery
pip install redis

应用

消息中间件redis的配置,及任务结果存储

1
2
3
4
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
# 使用 update设置多个celery的参数
celery.conf.update(app.config)

消费者task

1
2
3
4
5
6
7
8
9
10
11
12
@celery.task(bind=True)
def long_task(self):
# 执行耗费资源的算法
algorithm.reset_data()

app = create_app(os.getenv('FLASK_CONFIG') or 'default')
with app.app_context():
front_data = algorithm.run_search(algorithm.TestData.requirement, algorithm.TestData.disabled_devices)
import json

data = json.dumps(front_data)
return {'current': 100, 'total': 100, 'status': 'Task completed!', 'result': data}

通过task.apply_async将任务压入消息队列broker中

1
2
3
4
5
6
7
8
@energy_island.route('/get_graph_data', methods=['POST'])
@login_required
def get_graph_data():
id = request.values.get('id')
print(id)

task = tasks.long_task.apply_async()
return jsonify({}), 202, {'Location': url_for('energy_island.taskstatus', task_id=task.id)}

使用backend异步获取结果task.AsyncResult

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@energy_island.route('/status/<task_id>')
def taskstatus(task_id):
task = tasks.long_task.AsyncResult(task_id)
if task.state == 'PENDING':
response = {
'state': task.state,
'current': 0,
'total': 1,
'status': 'Pending...'
}
elif task.state != 'FAILURE':
response = {
'state': task.state,
'current': task.info.get('current', 0),
'total': task.info.get('total', 1),
'status': task.info.get('status', ' ')
}
if 'result' in task.info:
response['result'] = task.info['result']
else:
response = {
'state': task.state,
'current': 1,
'total': 1,
'status': str(task.info),
}
return jsonify(response)

生产者get消费者完成的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
var update_progress = function(status_url) {

// send GET request to status URL
$.getJSON(status_url, function(data) {
// update UI
percent = parseInt(data['current'] * 100 / data['total']);
if (data['state'] != 'PENDING' && data['state'] != 'PROGRESS') {
if ('result' in data) {

setGraphDataRun(JSON.parse(data['result']), 1);
// show result
alert('result-liubing: ' + data['result']);
}
else {
alert('state-liubing: ' + data['state']);
}
}
else {
// rerun in 2 seconds
setTimeout(function() {
update_progress(status_url);
}, 2000);
}
});
}

启动celery

1
celery -A app.celery worker -l info -P eventlet

利用AppVeyor做GithubIO的CI

建立两个仓库

我基于hexo搭建的github.io,在更换电脑时无法随意的提交blog。(因每个机器都需要部署node+hexo的环境)
于是,我将我的github.io建立两个仓库。

  • master仓库(hexo generate后生成的html等文件)
  • hexo仓库(原始文件,包含md文档及主题等)

并将hexo仓库设置为这个repo的主仓库。

AppVeyor

appveyor是支持windows OS做CI的持续集成工具,可以使用GitHub账号登陆。绑定我的github.io项目。

添加appveyor.yml到Source Repo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
clone_depth: 5

environment:
access_token:
secure: [Your Github Access Token]

install:
- ps: Install-Product node 6 #因为我的hexo是3.71的版本,因此需要安装node 6
- node --version
- npm --version
- npm install
- npm install hexo-cli -g


build_script:
- hexo generate

artifacts:
- path: public

on_success:
- git config --global credential.helper store
- ps: Add-Content "$env:USERPROFILE\.git-credentials" "https://$($env:access_token):x-oauth-basic@github.com`n"
- git config --global user.email "%GIT_USER_EMAIL%"
- git config --global user.name "%GIT_USER_NAME%"
- git clone --depth 5 -q --branch=%TARGET_BRANCH% %STATIC_SITE_REPO% %TEMP%\static-site
- cd %TEMP%\static-site
- del * /f /q
- for /d %%p IN (*) do rmdir "%%p" /s /q
- SETLOCAL EnableDelayedExpansion & robocopy "%APPVEYOR_BUILD_FOLDER%\public" "%TEMP%\static-site" /e & IF !ERRORLEVEL! EQU 1 (exit 0) ELSE (IF !ERRORLEVEL! EQU 3 (exit 0) ELSE (exit 1))
- git add -A
- git commit -m "Update Static Site" && git push origin %TARGET_BRANCH% && appveyor AddMessage "Static Site Updated"

在GitHub生成好Access Token之后,你需要到AppVeyor加密页面把Access Token加密之后再替换[Your GitHub Access Token]。
github-access-token
github-webhook

设置Appveyor的settings

appveyor-setting1
在Appveyor Settings的Environment里设置以下四个变量。STATIC_SITE_REPO就是github Repo的地址,TARGET_BRANCH是Repo的目标branch(这里是master,相当于提交编译后的html等文件至master),GIT_USER_EMAIL和GIT_USER_NAME就是你GitHub账号的信息。
appveyor-setting2
在Appveyor Settings的build里默认是MSBUILD,因为是appveyor.yml中配置的脚本来build的,因此这里改成SCRIPT。
appveyor-setting3

整体逻辑

本地在hexo的分支下写md文档,提交至github上,触发AppVeyor的webhook,AppVeyor会git clone下hexo的最新代码,并且build成目标文件,将目标文件push至master分支。则完成了整体刷新页面,持续部署的工作了。
可以在线监控build的状况:
appveyor-build-log

git删除所有提交历史记录

checkout

1
git checkout --orphan latest_branch

添加所有文件

1
git add -A

提交变更

1
git commit -am "something about message"

删除分支

1
git branch -D master

重命名当前branch为目标名

1
git branch -m master

强制push

1
git push -f origin master

centos7安装postgresql

安装启动postgresql

1
2
3
yum install postgresql-server
service postgresql initdb
service postgresql start

修改管理员密码

1
2
3
su - postgres
psql
$ ALTER USER postgres WITH PASSWORD 'postgres';

配置远程访问

修改**/var/lib/pgsql/data/postgresql.conf**

1
listen_addresses = '*'

修改客户端认证配置文件**/var/lib/pgsql/data/pg_hba.conf**,添加

1
host  all  all   10.0.0.0/8  md5

重启服务

1
service postgresql restart

jenkins自动部署docker应用

项目背景

目前需要部署一个基于python flask的web服务,数据库使用的是postgresql。

思路&流程

  • 准备docker镜像
  • jenkins拉取远端源码–git
  • 实现应用打包–jenkins本地
  • 把应用打包进docker镜像–dockerfile
  • 镜像同步到docker私有仓库–shell docker命令
  • 删除老的docker容器–shell docker命令
  • 运行新的docker容器–shell docker命令

准备docker镜像

创建自定义flask镜像

基于tiangolo/uwsgi-nginx-flask创建flask的镜像。
自定义flask镜像的dockerfile:

1
2
3
4
5
FROM tiangolo/uwsgi-nginx-flask:python2.7

COPY ./app /app
COPY ./lib/libseuif97.so /usr/lib
RUN pip install -r requirements.txt

因为网络环境不好,安装requirements.txt中python的第三方库时,发生报错,因此我将基本的库事先安装好。
并且把需要的so静态文件也COPY进镜像中。
后面如果有变化,可以根据需要在jenkins中再动态生成dockerfile并执行。

1
docker build -t test-flask .

创建自定义的postgresql

自定义postgresql镜像的dockerfile:

1
2
3
FROM postgres:9.3

ADD ./sql /docker-entrypoint-initdb.d/

我们有一些master表,以及基础数据,我们需要在运行这个容器之前,将这些基础数据insert进db这个docker之中。
可以把各种sql文件放入/sql路径下。

commit到私有docker仓库

1
2
3
docker commit -m "flask插件安装" -a "liubing" $containId liubing/test-flask
docker tag liubing/$imageName $dockerRegistsryAddress/test-flask
docker push $dockerRegistsryAddress/test-flask

db的docker也可提前commit到私有仓库。

jenkins拉源码

在jenkins中配置“源码管理”,输入source仓库的git地址,并绑定git用户及需要检测状态变化的branch。在构建时会自动下载git源码的。

配置git地址

实现应用打包

目前开发的是一个python项目,python不需要打包。源码即可执行。

把应用打包进docker镜像

1
2
3
4
5
6
echo 'From $dockerRegistsryAddress/test-flask
MAINTAINER liubing "lbingg@hotmail.com"

COPY . /app

' > Dockerfile;

jenkins中执行的脚本会默认当前脚本处于jenkins环境中的workspace中的当前应用工程下。因此在copy程序时,我们需要将当前目录下的所有文件全部拷贝入新创建docker镜像的/app目录中。

镜像同步到docker私有仓库

1
docker build -t $dockerRegistsryAddress/test-flask;

删除老的docker容器

1
2
3
4
docker stop postgresql || true;
docker rm postgresql || true;
docker stop test-flask || true;
docker rm test-flask || true;

运行新的docker容器

1
2
3
docker run --name=postgresql -itd --restart always --publish 5432:5432 --volume /opt/postgresql/data:/var/lib/postgresql --env 'DB_USER=postgres' --env 'DB_PASS=postgres' --env 'DB_NAME=postgres' $dockerRegistryAddress:5000/test-db-1;

docker run --name test-flask --publish 80:80 --link postgresql:postgres -d $dockerRegistryAddress:5000/test-flask;

jenkins中的“构建”配置

配置构建

问题点

需要在jenkins的docker中运行其他的docker命令,可以使用Docker outside of Docker来配置。
Docker-outside-of-Docker

  • Copyrights © 2015-2021 小白兔
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信