[toc]
apscheduler部署方案
前言
下面所讲的apscheduler多进程,指的是使用flask_apscheduler与apscheduler结合起多个worker就起多个apscheduler,如果直接使用python自带的werkzeug启动或者单进程启动会少很多问题,这样的结果是并发量是有限制的。
gunicorn
apscheduler多进程
workers = 16
这种方案会导致起多个apscheduler,如果用锁,如文件锁控制只起一个的话,请求不落在启动的worker上则timer会处于pending状态
apscheduler多进程优化
workers = 16
--preload
# 只在命令有效,如果配置在配置文件中不生效
这样实际上也只是起一个worker,而且gunicorn只能用prefork模式,这样对apscheduler会有影响。
总结
import multiprocessing
bind = '0.0.0.0:8203' #绑定的ip及端口号
#workers = multiprocessing.cpu_count() * 2 + 1 #进程数
workers = 16
backlog = 2048 #监听队列,允许挂起的链接数
worker_class = "eventlet"
debug = True
chdir = '/app/microservice/timer_service/timer_app/' #你项目的根目录,比如我的app.py文件在/home/ubuntu/app目录下,就填写'/home/ubuntu/app'
proc_name = 'gunicorn.proc'
daemon = True
pidfile = "/app/microservice/timer_service/deploy_config/gunicorn.pid"
reload=True
accesslog = "/app/microservice/timer_service/deploy_config/access.log"
errorlog = "/app/microservice/timer_service/deploy_config/error.log"
threads = 32
uwsgi
选择uwsgi一个很重要的原因是gunicorn只能是prefork模式,这种模式对apscheduler的影响后面会讲。
使用enable-threads启动,因为apscheduler放在后台的话,是以线程启动的
apscheduler多进程
不支持即便配置lazy-apps=true
,现象与gunicorn.apscheduler多进程 一样
总结
[uwsgi]
http=:8203
projectpath = /app/microservice/timer_service
pythonpath = %(projectpath)/timer_app
wsgi-file = run.py
module = run
callable = app
processes = 8
threads = 64
enable-threads = true
preload=True
lazy-apps=true
master = false
pidfile = %(projectpath)/deploy_config/pid_%n.pid
#mule=init.py
#daemonize = %(projectpath)/logs/uwsgi_log.log
解决方案
问题
gunicorn由于prefork会导致暂停至超时再次恢复,会出现作业依然在,虽然不执行,原因是下面函数没有执行,理论上暂停再次恢复的流程是先修改时间job._modify(**changes)
,再跑一次循环self.wakeup()
,而wakeup的作用是在下面函数中执行一遍,但是出现下面问题:
- gunicorn的prefork模式如上述操作中没有进入到下面函数,只有进入_process_jobs才可能Miss;
- 重新启动应用的时候进入了,就是说再次启动这个作业会Miss;
未找到原因(有可能是由于prefork认为_main_loop是个阻塞的操作线程不切换?)
# .venv/lib/python3.6/site-packages/apscheduler/blocking.py
def _main_loop(self):
wait_seconds = TIMEOUT_MAX
while self.state != STATE_STOPPED:
self._event.wait(wait_seconds)
self._event.clear()
wait_seconds = self._process_jobs()
解决
# 执行时间过久
Scheduler.print_jobs()
使用uwsgi mule模块,分离web和apscheduler,通过grpc调用即可
note
这种创建方式貌似更好
app = create_app(config=ProductionConfig())
def job_listener(event):
get_ = "msg from job '%s'" % (event.job)
logging.info(get_)
# This code below never gets invoked when I check with worker_id() == 1
# The only time it is run is with worker_id() value of 0
app.sched = Scheduler()
app.sched.add_jobstore(ShelveJobStore('/tmp/apsched_%d' % uwsgi.worker_id()), 'file')
app.sched.add_listener(job_listener,
events.EVENT_JOB_EXECUTED |
events.EVENT_JOB_MISSED |
events.EVENT_JOB_ERROR)
app.sched.start()
参考
https://uwsgi-docs-zh.readthedocs.io/zh_CN/latest/RPC.html
http://cn.voidcc.com/question/p-spijgpdn-bod.html
https://www.cnblogs.com/zhangliang91/p/11603916.html https://segmentfault.com/q/1010000010169238