2023怡然:算力远程调度 接口
来自CloudWiki
代码
from flask import Flask, request from flask import jsonify import datetime app = Flask(__name__) applications =[ { "id":1, "user_id":1, "user_name":"zhangli", "apply_time":"2023-08-10 11:09", "use_time":"2023-08-11 16:00", "duration":2, "type":"1核2G", "price":4, "status":"待审核", "process_time":None, "process_by":"小王" }, { "id": 2, "user_id": 2, "user_name": "wangming", "apply_time": "2023-08-12 11:09", "use_time": "2023-08-12 09:00", "duration": 3, "type": "2核2G", "price": 4, "status": "待审核", "process_time":None, "process_by": "小王" }, ] #新建算力申请 /application/ @app.route('/application', methods=['POST']) def create_application(): data = request.json # 检查 if 'user_name' not in data or 'apply_time' not in data \ or 'use_time' not in data or 'duration' not in data or 'type' not in data \ or 'status' not in data or 'process_by' not in data: return jsonify({'error': 'Missing required fields'}), 400 # 创建新申请 new_application = { "id": applications[-1]['id']+1, "user_id": 1, "user_name": request.json.get('user_name'), "apply_time": request.json.get('apply_time'), "use_time": request.json.get('use_time'), "duration":request.json.get('duration'), "type": request.json.get('type'), "price": request.json.get('price'), "status": request.json.get('status'), "process_by": request.json.get('process_by'), } applications.append(new_application) # 返回成功的响应 return jsonify({'message': 'Application created', 'application': new_application}), 201 def get_applications_from_database(): pass #查看申请列表 @app.route('/applications/list', methods=['GET']) def get_applications(): #applications = get_applications_from_database() global applications return jsonify({'applicaions':applications}) def get_application_from_database(application_id): pass #查看某条申请详情 @app.route('/applications/<int:id>', methods=['GET']) def get_application(id): # 获取申请详情的逻辑 application =None #application = 算力远程调度.可用的.demo1.get_application_from_database(application_id) # 从数据库中获取指定申请的详情 for a in applications: if a['id'] == id: application =a break if application is None: return jsonify({'error': 'Application not found'}), 404 # 如果申请不存在,则返回404错误 return jsonify(application) ##预留接口 def get_applications_from_database(): pass #批复申请(修改申请状态) @app.route('/application/<int:id>', methods=['PUT']) def update_application(id): print(id) for a in applications: print(id,a['id'],a['id'] == id) if a['id'] == id: print("success") print(request.json.get('process_by')) a['process_by'] = request.json.get('process_by') status = request.json.get('status') print(status,"agree") a['process_time'] = datetime.datetime.now() # print(current_time) if status == "agree": a['status'] = '同意' return jsonify({'message': 'Application replied successfully'}) else: a['status'] = '不同意' return jsonify({'message': 'Application replied failed'}) return jsonify({'error': 'Application not found'}), 404 # 如果申请不存在,则返回404错误 ##预留接口 def update_application_in_database(application): pass if __name__ == '__main__': app.run(host='0.0.0.0',debug=True)
运行:
运行:nohup python3 manager.py > flask_log.txt 2>&1 &