flask部署apscheduler方案

[toc]

apscheduler部署方案

前言

下面所讲的apscheduler多进程,指的是使用flask_apscheduler与apscheduler结合起多个worker就起多个apscheduler,如果直接使用python自带的werkzeug启动或者单进程启动会少很多问题,这样的结果是并发量是有限制的。

gunicorn

apscheduler多进程

workers = 16

这种方案会导致起多个apscheduler,如果用锁,如文件锁控制只起一个的话,请求不落在启动的worker上则timer会处于pending状态

apscheduler多进程优化

workers = 16

--preload # 只在命令有效,如果配置在配置文件中不生效

这样实际上也只是起一个worker,而且gunicorn只能用prefork模式,这样对apscheduler会有影响。

总结

import multiprocessing
bind = '0.0.0.0:8203'    #绑定的ip及端口号
#workers = multiprocessing.cpu_count() * 2 + 1        #进程数
workers = 16
backlog = 2048        #监听队列,允许挂起的链接数
worker_class = "eventlet"
debug = True
chdir = '/app/microservice/timer_service/timer_app/'  #你项目的根目录,比如我的app.py文件在/home/ubuntu/app目录下,就填写'/home/ubuntu/app'
proc_name = 'gunicorn.proc'
daemon = True
pidfile = "/app/microservice/timer_service/deploy_config/gunicorn.pid"
reload=True
accesslog = "/app/microservice/timer_service/deploy_config/access.log"
errorlog = "/app/microservice/timer_service/deploy_config/error.log"
threads = 32

uwsgi

选择uwsgi一个很重要的原因是gunicorn只能是prefork模式,这种模式对apscheduler的影响后面会讲。

使用enable-threads启动,因为apscheduler放在后台的话,是以线程启动的

apscheduler多进程

不支持即便配置lazy-apps=true,现象与gunicorn.apscheduler多进程 一样

总结

[uwsgi]
http=:8203
projectpath = /app/microservice/timer_service
pythonpath = %(projectpath)/timer_app
wsgi-file = run.py
module = run
callable = app
processes = 8
threads = 64
enable-threads = true
preload=True
lazy-apps=true
master = false
pidfile = %(projectpath)/deploy_config/pid_%n.pid
#mule=init.py
#daemonize = %(projectpath)/logs/uwsgi_log.log

解决方案

问题

gunicorn由于prefork会导致暂停至超时再次恢复,会出现作业依然在,虽然不执行,原因是下面函数没有执行,理论上暂停再次恢复的流程是先修改时间job._modify(**changes),再跑一次循环self.wakeup(),而wakeup的作用是在下面函数中执行一遍,但是出现下面问题:

  • gunicorn的prefork模式如上述操作中没有进入到下面函数,只有进入_process_jobs才可能Miss;
  • 重新启动应用的时候进入了,就是说再次启动这个作业会Miss;

未找到原因(有可能是由于prefork认为_main_loop是个阻塞的操作线程不切换?)

# .venv/lib/python3.6/site-packages/apscheduler/blocking.py
    def _main_loop(self):
        wait_seconds = TIMEOUT_MAX
        while self.state != STATE_STOPPED:
            self._event.wait(wait_seconds)
            self._event.clear()
            wait_seconds = self._process_jobs()

解决

# 执行时间过久
Scheduler.print_jobs()

使用uwsgi mule模块,分离web和apscheduler,通过grpc调用即可

note

这种创建方式貌似更好

app = create_app(config=ProductionConfig())

def job_listener(event):
    get_ = "msg from job '%s'" % (event.job)
    logging.info(get_)

# This code below never gets invoked when I check with worker_id() == 1
# The only time it is run is with worker_id() value of 0
app.sched = Scheduler()
app.sched.add_jobstore(ShelveJobStore('/tmp/apsched_%d' % uwsgi.worker_id()), 'file')
app.sched.add_listener(job_listener,
                   events.EVENT_JOB_EXECUTED |
                   events.EVENT_JOB_MISSED |
                   events.EVENT_JOB_ERROR)
app.sched.start()

参考

https://uwsgi-docs-zh.readthedocs.io/zh_CN/latest/RPC.html

http://cn.voidcc.com/question/p-spijgpdn-bod.html

https://www.cnblogs.com/zhangliang91/p/11603916.html https://segmentfault.com/q/1010000010169238

https://stackoverflow.com/questions/24352634/would-starting-apscheduler-in-a-uwsgi-app-end-up-with-one-scheduler-for-each-wor

本文作者:朝圣

本文链接:www.zh-noone.cn/2020/7/Scheduler

版权声明:本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0许可协议。转载请注明出处!

redis数据结构
0 条评论