首页
AI
测试
Search
1
Adobe GenP使用教程
350 阅读
2
PEA(Product Experience Assesment,产品体验评估)
209 阅读
3
DFX设计与实现
96 阅读
4
年后跳槽时间线
69 阅读
5
弱矩阵、强矩阵、 和平衡矩阵的区别
66 阅读
项目管理
产品管理
思想手册
E-Book
教程
Linux
Docker
MacOS
Windows
其他教程
sketch
Flask
python3
杂项
登录
Search
标签搜索
数据分析
电子书
变更
工作量评估
敏捷
模版
职级能力
debian11
Adobe
GenP
项目管理模版
香蕉你个不呐呐
累计撰写
158
篇文章
累计收到
0
条评论
首页
栏目
项目管理
产品管理
思想手册
E-Book
教程
Linux
Docker
MacOS
Windows
其他教程
sketch
Flask
python3
杂项
页面
AI
测试
搜索到
3
篇与
Flask
的结果
2024-11-14
如何在生产环境中后台运行Gunicorn
本文细解释如何在生产环境中后台运行Gunicorn。 如何将 gunicorn --workers 3 --bind 0.0.0.0:9999 app:app 命令在后台稳定运行,并能够处理进程重启和优雅关闭等情况。直接使用 & 在后台运行是不够可靠的。我们需要借助进程监控工具来实现。常用的有两种:systemd(Linux系统推荐)和Supervisor(跨平台)。方法一:使用 systemd (推荐用于Linux)创建 systemd 服务文件: 在 /etc/systemd/system/ 目录下创建一个新的文件,例如 gunicorn.service。 文件内容如下:CodeCopy[Unit]Description=Gunicorn实例用于myappAfter=network.target[Service]User=你的用户名 # 替换成你的用户名Group=你的用户组 # 替换成你的用户组WorkingDirectory=/你的应用路径 # 替换成你的应用所在目录的绝对路径Environment="PATH=/你的虚拟环境路径/bin:$PATH" # 可选:如果你使用了虚拟环境ExecStart=/你的虚拟环境路径/bin/gunicorn --workers 3 --bind 0.0.0.0:9999 app:app如果你没有使用虚拟环境,请将上面一行替换为:ExecStart=/gunicorn的路径/gunicorn --workers 3 --bind 0.0.0.0:9999 app:appRestart=alwaysRestartSec=10[Install]WantedBy=multi-user.target请务必替换以下占位符:你的用户名: 你的Linux用户名。你的用户组: 你的Linux用户组。/你的应用路径: 包含你的Flask应用 (app.py) 文件的目录的绝对路径。例如 /home/user/myapp。/你的虚拟环境路径: 如果你使用了虚拟环境 (virtualenv),这是你的虚拟环境的 bin 目录的绝对路径。如果没有使用虚拟环境,请删除 Environment 行并修改 ExecStart 行。 /gunicorn的路径 指向gunicorn可执行文件的路径启用并启动服务: 保存文件后,执行以下命令:CodeCopysudo systemctl enable gunicorn.servicesudo systemctl start gunicorn.service检查状态: 使用以下命令检查 Gunicorn 是否正常运行:CodeCopysudo systemctl status gunicorn.service这将显示服务的运行状态,包括日志和错误信息。方法二:使用 Supervisor (跨平台)Supervisor 是一个更通用的进程监控工具,可在多种操作系统上运行。你需要先安装它(例如,在 Debian/Ubuntu 上使用 sudo apt-get install supervisor)。然后,你需要编辑 Supervisor 的配置文件(通常位于 /etc/supervisor/conf.d/gunicorn.conf),配置类似于 systemd 的例子,但语法不同。请参考 Supervisor 的文档了解具体的配置细节。重要注意事项:错误日志: Gunicorn 的日志对于调试至关重要。确保它们已正确配置并可访问。systemd 和 Supervisor 通常有记录服务输出的机制。进程管理: 在生产环境中,进程监控工具是必不可少的。它负责重启、优雅关闭和监控 Gunicorn 进程。安全性: 出于安全原因,请以非 root 用户运行 Gunicorn。虚拟环境: 强烈建议使用虚拟环境来隔离应用程序的依赖项。请仔细检查所有路径和用户名是否正确。选择 systemd 或 Supervisor 中的一种方法,不要同时使用两者。 在 Linux 系统上,通常推荐使用 systemd,因为它与操作系统集成得更好。 如果还有问题,请提供你的操作系统类型和 app.py 文件所在的目录。
2024年11月14日
5 阅读
0 评论
0 点赞
2024-11-13
优化
这段代码主要问题在于重复代码过多,以及路由处理方式不够优雅。 soft、device、wuhan 三个路由几乎完全相同,只是IP白名单和部分PM列表不同。gantt相关的路由也存在大量重复。 此外,错误处理可以更完善。以下提供优化方案:重构 IP 白名单:将 ip_whitelist 改为更易于管理的结构,例如字典嵌套列表:ip_whitelist = { 'soft': [ ('孔鲁文', "172.16.33.191"), ('李嘉政', "172.16.27.248"), # ... ], 'device': [ ('孔鲁文', "172.16.33.191"), # ... ], # ... }这样可以简化IP校验逻辑。创建辅助函数:创建辅助函数来处理重复的逻辑,例如:def process_project_data(ip_list, pm_list, remove_projects, is_normal): """处理项目数据,用于 soft, device, wuhan 路由""" removeTemoList = [f" name <> '{project}'" for project in remove_projects] removeProject = 'AND'.join(removeTemoList) joinTemp = [f" PmUserId = '{pm_id}'" for pm_name, pm_id, enabled in pm_list if enabled] PM = 'OR'.join(joinTemp) if joinTemp else "1=0" #避免空字符串导致错误 tempDatajson = { 'Normal': { 'count': None, 'DetailedList': SerachProject(PM, removeProject, True) }, 'over': { 'count': None, 'DetailedList': SerachProject(PM, removeProject, False) } } return render_template('base.html', tempDatajson=tempDatajson) # 创建一个 base.html 模板 def check_ip_and_auth(ip, ip_list, route_name): """检查IP地址和权限""" if ip in [ip_addr for _, ip_addr in ip_list]: whiteKeyName = [name for name, ip_addr in ip_list if ip_addr == ip][0] logger.info(f'{whiteKeyName} | {ip} 访问/{route_name}') return True else: logger.info(f'未授权 | {ip} 访问/{route_name}') return False def auth_required(func): """装饰器,用于检查权限""" def wrapper(*args, **kwargs): ip = request.remote_addr if not check_ip_and_auth(ip, ip_whitelist[kwargs['route_name']], kwargs['route_name']): return render_template("AuthFailed.html") return func(*args, **kwargs) return wrapper使用装饰器简化路由:使用装饰器 auth_required 来简化路由的权限检查:@app.route('/device', methods=['GET', 'POST']) @auth_required def device(): return process_project_data(ip_whitelist['device'], device_pm_list, remove_projects, True) @app.route('/soft', methods=['GET', 'POST']) @auth_required def soft(): return process_project_data(ip_whitelist['soft'], soft_pm_list, remove_projects, True) @app.route('/wuhan', methods=['GET', 'POST']) @auth_required def wuhan(): return process_project_data(ip_whitelist['wuhan'], wuhan_pm_list, remove_projects, True)你需要定义 device_pm_list, soft_pm_list, wuhan_pm_list 和 remove_projects 变量。重构 Gantt 路由:gantt 相关的路由也存在大量重复,可以提取公共部分:def handle_gantt(redis_name, pm_list, statement, template_name): today = datetime.now().strftime("%Y-%m-%d") redis_name = f'{redis_name}_{today}' try: random_string = request.args.get('token') if random_string == settings.gantt_tokens[template_name]: # 将token放到settings中 tempDatajson = project_gantt_public(redis_name, pm_list, statement) return render_template(template_name, tempDatajson=tempDatajson, today=today) else: return render_template("AuthFailed.html") except Exception as e: print(f"Error in gantt(): {e}") return "重新请求尝试" @app.route('/gantt', methods=['GET', 'POST']) def project_gantt_all_in_one(): return handle_gantt('project_gantt', ['李嘉政', '王清臣', ...], '', 'gantt.html') @app.route('/gantt_huishiyun', methods=['GET', 'POST']) def project_gantt_huishiyun(): return handle_gantt('project_gantt_huishiyun', ['李嘉政'], " AND a.ProjectBaseName NOT LIKE '%平台软件日常支撑(2024)%'", 'gantt.html') # ... 其他 Gantt 路由类似处理 ...在 settings.py 中定义 gantt_tokens 字典来存储每个 Gantt 路由的 token。改进错误处理:使用 Flask 的 errorhandler 装饰器来处理异常,提供更友好的错误页面。定时任务:定时任务部分代码可以优化,例如使用更简洁的数据库查询和数据处理方式。 可以考虑使用更强大的数据库ORM框架(如SQLAlchemy)来简化数据库操作。代码规范:使用更具描述性的变量名。保持代码缩进一致。添加必要的注释。总而言之,通过重构、提取公共方法、使用装饰器和改进错误处理,可以显著提高代码的可读性、可维护性和可扩展性。 记住,代码优化是一个迭代的过程,需要逐步改进。 我提供的方案是一个起点,你可以根据实际情况进行调整和完善。 你需要根据自己的需求创建 base.html 模板以及其他必要的修改。 也请确保你的 settings.py 文件包含了必要的配置信息。提供一个更结构化、更易于维护的代码框架,你需要根据你的实际环境和数据库配置进行调整。settings.py: import os # 数据库配置 DATABASE_URL = os.environ.get('DATABASE_URL') or '你的数据库连接字符串' # Redis 配置 REDIS_HOST = '192.168.10.100' REDIS_PORT = 6379 REDIS_PASSWORD = 'lijiazheng' REDIS_DB = 0 # 日志文件路径 LOG_FILE = '/home/bugsSerachZentao/logfile.log' LOG_MAX_BYTES = 1e6 LOG_BACKUP_COUNT = 3 # Gantt token gantt_tokens = { 'gantt.html': 'AoVv9Pba3sGjZoGEMjKgs6Bhaw29AaZnzMsEQdwHyqmWY_aio', 'gantt.html': 'N2ALqcECzjiZv9N6kJYBPBW9EJMNHVqL9QvtXTrCdZrmP_lijzh', 'gantt.html': '65KEq8sgQGUK8TQiUCB2MzZJjnlK7TVVevE8TZqk4WVBv_wqc', 'gantt.html': 'RaqWTAGqke46WHdmRep6N9g3fuGxQDPnjE973H7mCaYST_lcy', 'gantt.html': 'or8Alc3NZt9jx5fR79GFQdWrsDAqZDuCldAX4pDgimG73_platform', 'gantt.html': 'Qqu9w3fhJQfWd82U23Vpf8AoVUViYMevdexqQHC3FqERT_testing', 'test.html': 'test_46QYcgwBmmqAkRWL95VkH6aasd7WsYziQiChdWv4f3LbNcsfNGduudyDrgW5uUaV_test', 'iorz.html': '^gwUSSNwzUoRhgg7CZwSUQQUDiWG4XxZqT526DUgaaZhjb94Y95tTUz3w%%oc', 'ptyf.html': '^gwUSSNwzUoRhgg7CZwSUQQUDiWG4XxZqT526DUgaaZhjb94Y95tTUz3w%%oa', 'beziquePlus.html': 'V98cD8H7CZwSUQQUDiWG4XxZqT526DUgHGkHb94Y95t6Wz', } # ...其他配置... app.py: from flask import Flask, request, render_template, redirect from DBmodel.bugs import get_project_list # 假设这个模块存在 from DBmodel.info import zantao_project_baseInfo # 假设这个模块存在 import settings, warnings from logzero import logger, logfile from package.tools import Tools # 假设这个模块存在 from Model.project_info import SerachProject # 假设这个模块存在 from Model.timeLine import timelines # 假设这个模块存在 from Model.orz import orzs # 假设这个模块存在 from Model.ptyf import ptyfb # 假设这个模块存在 from Model.beziquePlus import bezique # 假设这个模块存在 from Model.project_gantt import gantt # 假设这个模块存在 from datetime import datetime from apscheduler.schedulers.background import BackgroundScheduler import redis, json from DBmodel.db_report_oa import MysqlDB # 假设这个模块存在 from decimal import Decimal from collections import defaultdict warnings.filterwarnings("ignore") # ... (IP 白名单部分,参考之前的重构方案) ... app = Flask(__name__) app.config.from_object(settings) redis_client = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, password=settings.REDIS_PASSWORD, db=settings.REDIS_DB) tools = Tools() logfile(settings.LOG_FILE, maxBytes=settings.LOG_MAX_BYTES, backupCount=settings.LOG_BACKUP_COUNT) # ... (辅助函数,参考之前的方案) ... # 路由 @app.route('/', methods=['GET', 'POST']) def index(): # ... (index 路由逻辑,参考之前的代码,简化重复部分) ... @app.route('/device', methods=['GET', 'POST']) @auth_required('device') def device(): return process_project_data(ip_whitelist['device'], device_pm_list, remove_projects, True, 'device.html') @app.route('/soft', methods=['GET', 'POST']) @auth_required('soft') def soft(): return process_project_data(ip_whitelist['soft'], soft_pm_list, remove_projects, True, 'soft.html') @app.route('/wuhan', methods=['GET', 'POST']) @auth_required('wuhan') def wuhan(): return process_project_data(ip_whitelist['wuhan'], wuhan_pm_list, remove_projects, True, 'wuhan.html') @app.route('/timeline', methods=['GET', 'POST']) def timeline(): tempDatajson = timelines() return render_template('timeline.html', tempDatajson=tempDatajson) @app.route('/orz', methods=['GET', 'POST']) @auth_required('jiaban') def orz(): tempDatajson = orzs() return render_template('orz.html', tempDatajson=tempDatajson) @app.route('/ptyf', methods=['GET', 'POST']) def ptyf(): # ... (ptyf 路由逻辑) ... @app.route('/beziquePlus', methods=['GET', 'POST']) def ptbeziquePlus(): # ... (beziquePlus 路由逻辑) ... @app.route('/gantt', methods=['GET', 'POST']) def project_gantt_all_in_one(): return handle_gantt('project_gantt', ['李嘉政', '王清臣', ...], '', 'gantt.html') @app.route('/gantt_huishiyun', methods=['GET', 'POST']) def project_gantt_huishiyun(): return handle_gantt('project_gantt_huishiyun', ['李嘉政'], " AND a.ProjectBaseName NOT LIKE '%平台软件日常支撑(2024)%'", 'gantt.html') # ... 其他 Gantt 路由 ... @app.route('/test', methods=['GET', 'POST']) def test(): return handle_gantt('project_gantt_huishiyun', ['李嘉政', '王清臣', ...], " AND a.ProjectBaseName NOT LIKE '%平台软件日常支撑(2024)%'", 'test.html') @app.route('/iorz', methods=['GET', 'POST']) @auth_required('jiaban') def iorz(): # ... (iorz 路由逻辑) ... # ... (错误处理) ... # ... (定时任务) ... if __name__ == '__main__': scheduler = BackgroundScheduler() scheduler.add_job(cron_OvertimeData, 'cron', hour=16, minute=37) scheduler.add_job(cron_getProjectTaskDetails, 'cron', hour=7, minute=30) scheduler.start() app.run(host='0.0.0.0', port=8080, debug=True) 其他模块 (Model, DBmodel 等):你需要根据你的实际情况完善这些模块。 确保你的数据库连接、数据模型以及其他依赖都正确配置。注意: 你需要替换占位符,例如数据库连接字符串,以及完善 device_pm_list, soft_pm_list, wuhan_pm_list, remove_projects 等变量,以及 handle_gantt 函数中的 pm_list 和 statement 参数。 还需要创建 base.html, device.html, soft.html, wuhan.html, gantt.html, test.html, iorz.html, timeline.html, ptyf.html, beziquePlus.html 等模板文件。 这个代码框架只是为了展示如何改进代码结构,你仍然需要根据你的实际需求进行调整和完善。 尤其要仔细检查数据库查询语句,避免SQL注入漏洞。 建议使用参数化查询来防止SQL注入。
2024年11月13日
5 阅读
0 评论
0 点赞
2024-11-13
Flask定时任务
方法一:使用 interval 参数 (每小时执行一次)from apscheduler.schedulers.background import BackgroundScheduler def cron_OvertimeData(): print('123') if __name__ == '__main__': scheduler = BackgroundScheduler() scheduler.add_job(cron_OvertimeData, 'interval', hours=1) # 每小时执行一次 scheduler.start() app.run(host='0.0.0.0', port=8080, debug=True)方法二:使用 cron 参数,更精确控制 (每小时的0分执行)from apscheduler.schedulers.background import BackgroundScheduler def cron_OvertimeData(): print('123') if __name__ == '__main__': scheduler = BackgroundScheduler() scheduler.add_job(cron_OvertimeData, 'cron', minute='0') # 每小时的0分执行 scheduler.start() app.run(host='0.0.0.0', port=8080, debug=True)方法三:结合 date 参数,设置起始时间和结束时间 (每小时执行一次,持续10小时)from apscheduler.schedulers.background import BackgroundScheduler from datetime import datetime, timedelta def cron_OvertimeData(): print('123') if __name__ == '__main__': scheduler = BackgroundScheduler() start_time = datetime.now() end_time = start_time + timedelta(hours=10) #运行10小时 scheduler.add_job(cron_OvertimeData, 'interval', hours=1, start_date=start_time, end_date=end_time) scheduler.start() app.run(host='0.0.0.0', port=8080, debug=True)重要提示: 这些代码片段都假设你已经安装了 APScheduler 库 (pip install apscheduler),并且你的代码中已经存在 app 对象 (例如,Flask 应用)。 你需要根据你的实际应用环境进行调整。 特别是 app.run() 部分,需要根据你的Web框架进行修改。 如果你的应用不是基于Flask,你需要替换成你的框架的运行方法。 另外,确保你的程序能够持续运行,否则定时任务将无法正常执行。 在生产环境中,建议使用更健壮的进程管理工具来保证程序的稳定运行。是的,APScheduler 提供了更丰富的功能,可以补充到之前的例子中,使定时任务更加强大和灵活。以下是一些补充用法:1. 任务参数传递:你可以向定时任务函数传递参数:from apscheduler.schedulers.background import BackgroundScheduler from datetime import datetime, timedelta def my_task(param1, param2): print(f"Task executed with parameters: {param1}, {param2}") if __name__ == '__main__': scheduler = BackgroundScheduler() scheduler.add_job(my_task, 'interval', hours=1, args=['hello', 123]) scheduler.start() # ... your app.run() ...2. 任务ID和替换:你可以为每个任务分配一个ID,方便管理和替换:from apscheduler.schedulers.background import BackgroundScheduler def my_task(): print("Task executed") if __name__ == '__main__': scheduler = BackgroundScheduler() job = scheduler.add_job(my_task, 'interval', hours=1, id='my_unique_job_id') # ... later, you can replace the job ... scheduler.remove_job('my_unique_job_id') new_job = scheduler.add_job(my_task, 'interval', hours=2, id='my_unique_job_id') scheduler.start() # ... your app.run() ...3. 任务的暂停和恢复:你可以暂停和恢复特定的任务:from apscheduler.schedulers.background import BackgroundScheduler def my_task(): print("Task executed") if __name__ == '__main__': scheduler = BackgroundScheduler() job = scheduler.add_job(my_task, 'interval', hours=1, id='my_job') # ... later, pause the job ... job.pause() # ... later, resume the job ... job.resume() scheduler.start() # ... your app.run() ...4. 错误处理:使用 misfire_grace_time 参数设置任务错过执行时间的容忍时间,避免因为短暂的延迟而导致任务丢失:from apscheduler.schedulers.background import BackgroundScheduler def my_task(): print("Task executed") if __name__ == '__main__': scheduler = BackgroundScheduler() scheduler.add_job(my_task, 'interval', hours=1, misfire_grace_time=60) # 允许延迟60秒 scheduler.start() # ... your app.run() ...5. 使用不同的调度器:除了 BackgroundScheduler,APScheduler 还提供了 BlockingScheduler (阻塞调度器,适合简单的脚本) 和 AsyncIOScheduler (异步调度器,适合异步环境)。 选择合适的调度器取决于你的应用架构。6. 监听事件:APScheduler 提供了事件监听机制,可以让你在任务执行前、执行后或出现错误时执行相应的操作:from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR def my_listener(event): if event.exception: print(f"Job {event.job_id} failed: {event.exception}") else: print(f"Job {event.job_id} executed successfully") if __name__ == '__main__': scheduler = BackgroundScheduler() scheduler.add_job(my_task, 'interval', hours=1, id='my_job') scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) scheduler.start() # ... your app.run() ...这些补充用法可以让你更精细地控制定时任务的执行,并处理各种异常情况。 选择哪些用法取决于你的具体需求。 记住查阅 APScheduler 的官方文档以获取更详细的信息和更多高级功能。
2024年11月13日
3 阅读
0 评论
0 点赞