2023怡然:后端全部代码
来自CloudWiki
Ansible安装
Flask代码
manager.py:
from flask import Flask, jsonify, abort, request import datetime import pymysql conn1 = pymysql.connect(host='localhost', user='root', password='000000', port=3306, db='yiran') app = Flask(__name__) workOrder= [ { 'id': 1, 'user_id': 'xiaoli', 'title': u'电脑蓝屏', 'room':u'812', 'pc_id':u'pc01', 'desc':u'电脑蓝屏故障,持续10分钟', 'report_time':u'2023-08-01 14:52', 'receive_time':u'2023-08-01 15:52', 'solve_time':u'2023-08-01 16:52', 'status':u'未解决', 'type':u'单机故障', 'solved_by':u'engineer01' }, { 'id': 2, 'user_id': 'xiaoli', 'title': u'上不了网', 'room':u'812', 'pc_id':u'pc01', 'desc':u'上不了网,持续5分钟', 'report_time':u'2023-08-01 14:52', 'receive_time':u'2023-08-01 15:52', 'solve_time':u'2023-08-01 16:52', 'status':u'已解决', 'type':u'网络故障', 'solved_by':u'engineer02' }, ] #查看工单列表,GET操作 @app.route('/', methods=['GET']) @app.route('/workorder/api/v1/order_list', methods=['GET']) def get_tasks(): return jsonify({'workOrder': workOrder}) #get one specific order,GET操作 @app.route('/workorder/api/v1/order/<int:id>', methods=['GET']) def get_task(id): for order in workOrder: if order['id']==id: return jsonify({'workOrder': order}) abort(404) #insert a new order,POST操作 @app.route('/workorder/api/v1/add_order', methods=['POST']) def create_task(): if not request.form or not 'title' in request.form: abort(400) new_order= { 'id': workOrder[-1]['id'] + 1, 'user_id': request.form['user_id'], 'title': request.form['title'], 'room': request.form[''], 'pc_id': request.form[''], 'desc': request.form[''], 'report_time': request.form[''], 'receive_time': '', 'solve_time': '', 'status': '待查看', 'type': request.form[''], 'solved_by':'' } workOrder.append(new_order) return jsonify({'new_Order': new_order}) #update a task,PUT操作 @app.route('/workorder/api/v1/order/<int:id>/<string:action>/<string:engineer>', methods=['PUT']) def update_task(id): for order in workOrder: if order['id'] == id: order['solved_by'] = request.form['engineer'] action = request.form['action'] current_time = datetime.datetime.now() #print(current_time) if action == 'receive' : order['receive_time'] =current_time order['status']='正在处理' elif action == 'solve': order['solve_time'] =current_time order['status']= '已解决' return jsonify({'workOrder': workOrder}) abort(400) #delete a book,DELETE操作 @app.route('/workorder/api/v1/order/<int:id>', methods=['DELETE']) def delete_task(id): for order in workOrder: if order['id'] == id: workOrder.remove(order) return jsonify({'result': True}) abort(404) return jsonify({'result': True}) ####算力申请#### applications =[ { "id":1, "user_id":1, "user_name":"zhangli", "apply_time":"2023-08-10 11:09", "use_time":"2023-08-11 16:00", "duration":2, "type":"1核2G", "price":4, "status":"待审核", "process_time":None, "process_by":"小王" }, { "id": 2, "user_id": 2, "user_name": "wangming", "apply_time": "2023-08-12 11:09", "use_time": "2023-08-12 09:00", "duration": 3, "type": "2核2G", "price": 4, "status": "待审核", "process_time":None, "process_by": "小王" }, ] #新建算力申请 /application/ @app.route('/application', methods=['POST']) def create_application(): data = request.json # 检查 if 'user_name' not in data or 'apply_time' not in data \ or 'use_time' not in data or 'duration' not in data or 'type' not in data \ or 'status' not in data or 'process_by' not in data: return jsonify({'error': 'Missing required fields'}), 400 # 创建新申请 new_application = { "id": applications[-1]['id']+1, "user_id": 1, "user_name": request.json.get('user_name'), "apply_time": request.json.get('apply_time'), "use_time": request.json.get('use_time'), "duration":request.json.get('duration'), "type": request.json.get('type'), "price": request.json.get('price'), "status": request.json.get('status'), "process_by": request.json.get('process_by'), } applications.append(new_application) # 返回成功的响应 return jsonify({'message': 'Application created', 'application': new_application}), 201 def get_applications_from_database(): pass #查看申请列表 @app.route('/applications/list', methods=['GET']) def get_applications(): #applications = get_applications_from_database() global applications return jsonify({'applicaions':applications}) def get_application_from_database(application_id): pass #查看某条申请详情 @app.route('/applications/<int:id>', methods=['GET']) def get_application(id): # 获取申请详情的逻辑 application =None #application = 算力远程调度.可用的.demo1.get_application_from_database(application_id) # 从数据库中获取指定申请的详情 for a in applications: if a['id'] == id: application =a break if application is None: return jsonify({'error': 'Application not found'}), 404 # 如果申请不存在,则返回404错误 return jsonify(application) ##预留接口 def get_applications_from_database(): pass #批复申请(修改申请状态) @app.route('/application/<int:id>', methods=['PUT']) def update_application(id): print(id) for a in applications: print(id,a['id'],a['id'] == id) if a['id'] == id: print("success") print(request.json.get('process_by')) a['process_by'] = request.json.get('process_by') status = request.json.get('status') print(status,"agree") a['process_time'] = datetime.datetime.now() # print(current_time) if status == "agree": a['status'] = '同意' return jsonify({'message': 'Application replied successfully'}) else: a['status'] = '不同意' return jsonify({'message': 'Application replied failed'}) return jsonify({'error': 'Application not found'}), 404 # 如果申请不存在,则返回404错误 ##预留接口 def update_application_in_database(application): pass ########报警信息####### warnings =[ { "id":1, "device_id":1, "room_name":"机房306", "broken_time":"2023-08-10 11:09", "broken_level":"fatal", "title":"cpu 负载过高", "detail":"cpu high-level,needs to be done", "isdone":0 }, { "id":2, "device_id":2, "room_name":"机房307", "broken_time":"2023-08-10 11:09", "broken_level":"fatal", "title":"内存负载过高", "detail":"cpu high-level,needs to be done", "isdone":0 }, { "id":3, "device_id":3, "room_name":"机房308", "broken_time":"2023-08-10 11:09", "broken_level":"warning", "title":"网络流量过大", "detail":"network high-level,needs to be done", "isdone":0 }, ] #查看申请列表 @app.route('/warnings/list', methods=['GET']) def get_warnings(): #applications = get_applications_from_database() global warnings return jsonify({'warnings':warnings}) #查看某条警告详情 @app.route('/warnings/<int:id>', methods=['GET']) def get_warning(id): # 获取申请详情的逻辑 warning =None for a in warnings: if a['id'] == id: warning =a break if warning is None: return jsonify({'error': 'Warning not found'}), 404 # 如果申请不存>在,则返回404错误 return jsonify(warning) ######机器远程管理###### from test_ansible2 import do_ansible #get one specific book,GET操作 rooms =[ { "close_num": 1, "id": 311, "running_num": 0, "title": "机房311", "total_num": 1 }, { "close_num": 2, "id": 810, "running_num": 0, "title": "机房810", "total_num": 2 }, { "close_num": 2, "id": 804, "running_num": 1, "title": "机房804", "total_num": 3 }, { "close_num": 4, "id": 805, "running_num": 0, "title": "机房805", "total_num": 4 } ] #查看全部机房列表(无数据库) @app.route('/rooms_test', methods=['GET', 'POST']) def get_rooms_list_test(): return jsonify(rooms) abort(404) #查看全部机房列表(有数据库) ''' @app.route('/rooms', methods=['GET', 'POST']) def get_rooms_list(): conn1.ping(reconnect=True) cur = conn1.cursor() sql = "SELECT * from ComputerRoom LIMIT 4" cur.execute(sql) u = cur.fetchall() jsondata = [] for row in u: result = {} result['id'] = row[0] result['title']="机房"+str(row[0]) result['total_num'] = row[1] result['running_num'] = row[2] result['close_num'] = int(row[3])-int(row[2]) jsondata.append(result) return jsonify(jsondata) abort(404) ''' # 机房详细的机器列表(有数据库) @app.route('/room/<int:id>', methods=['GET', 'POST']) def get_room(id): conn1.ping(reconnect=True) cur = conn1.cursor() sql = f"SELECT * from ComputerRoom where room_id={id}" cur.execute(sql) u = cur.fetchall() jsondata = [] for row in u: result = {} result['room_id']=row[1] result['pc_id'] = row[2] result['status'] = row[3] jsondata.append(result) return jsonify(jsondata) abort(404) #关闭某机房内某台机器 close one machines of one room,GET操作 @app.route('/ansible/api/v1/machine_close/<string:host>', methods=['GET']) def shutdown_machine(host): host_list=[host] tasks_close=[ dict(action=dict(module='shell', args='shutdown -h now'), register='shell_out'), ] do_ansible(host_list,tasks_close) return jsonify({'status': host+"shutdown order is sent"}) abort(404) #房间机器列表 machine_list = { "806":[ "114.116.250.34", "124.70.107.73", "117.78.0.192", ], } #一键关闭某机房所有机器 close all machines of one room,GET操作 @app.route('/ansible/api/v1/machine_close/all/<string:room>', methods=['GET']) def shutdown_machine_all(room): host_list=machine_list[room] tasks_close=[ dict(action=dict(module='shell', args='shutdown -h now'), register='shell_out'), ] do_ansible(host_list,tasks_close) return jsonify({'status': room +"shutdown order is sent"}) abort(404) #重启某机房内某台机器 restart one machine of one room ,GET操作 @app.route('/ansible/machine_restart/single/<string:host>/<string:pc_id>', methods=['GET']) def restart_one_machine(host,pc_id): host_list=[host] tasks_reboot=[ dict(action=dict(module='shell', args='reboot'), register='shell_out'), ] do_ansible(host_list,tasks_reboot) return jsonify({'status': host+"_pc_"+pc_id+"restart order is sent"}) abort(404) #一键重启机房所有机器restart all machines of one room ,GET操作 @app.route('/ansible/machine_restart/all/<string:room>', methods=['GET']) def restart_machine(room): host_list=machine_list[room] tasks_reboot = [ dict(action=dict(module='shell', args='reboot'), register='shell_out'), ] do_ansible(host_list, tasks_reboot) return jsonify({'status': room+"restart order is sent"}) abort(404) ###### 机器健康监测 ####### #机房健康监测 机房房间故障统计 @app.route('/room/unhealthy_machines/all', methods=['GET','POST']) def get_room_unhealthy_data( ): jsondata =[ {'id':'311','total':45,'bad':2},{'id':'810','total':45,'bad':3}, \ {'id':'810','total':45,'bad':5},{'id':'804','total':45,'bad':2},{'id':'805','total':45,'bad':3}, \ {'id':'704','total':45,'bad':1}] return jsonify(jsondata) abort(404) # 机器健康检测:查看某机房所有故障机器列表,id 代表机房id号 @app.route('/room/unhealthy_machines/<int:room_id>', methods=['GET', 'POST']) def get_room_unhealthy_pc(room_id): conn1.ping(reconnect=True) cur = conn1.cursor() sql = f"SELECT * from ComputerRoom where room_id={room_id} and report_type is NOT NULL" cur.execute(sql) u = cur.fetchall() jsondata = [] for row in u: result = {} result['id'] = row[0] result['room_id'] = row[1] result['machine_id'] = row[2] result['machine_status'] = row[3] result['report_type'] = row[4] result['report_content'] = row[5] # 增加健康状态字段:0代表健康,1代表不健康 jsondata.append(result) return jsonify(jsondata) abort(404) ######视频远程监控###### from flask import Flask, jsonify, abort, request, render_template,Response,request,url_for import requests #导包 import numpy as np import time import os img_num =1 from flask import Flask, send_file import os @app.route('/image') def image(): global img_num #os.remove('data/frame'+str(img_num)+".jpg") #img_num +=1 img_num =9 return send_file('data/frame'+str(img_num)+".jpg", mimetype='image/jpg') if __name__ == '__main__': app.run(host='0.0.0.0',port="5003",debug=True)
Andible调用程序
test_ansible2.py:
#!/usr/bin/env python from __future__ import (absolute_import, division, print_function) __metaclass__ = type import json import shutil import ansible.constants as C from ansible.executor.task_queue_manager import TaskQueueManager from ansible.module_utils.common.collections import ImmutableDict from ansible.inventory.manager import InventoryManager from ansible.parsing.dataloader import DataLoader from ansible.playbook.play import Play from ansible.plugins.callback import CallbackBase from ansible.vars.manager import VariableManager from ansible import context # Create a callback plugin so we can capture the output class ResultsCollectorJSONCallback(CallbackBase): """A sample callback plugin used for performing an action as results come in. If you want to collect all results into a single object for processing at the end of the execution, look into utilizing the ``json`` callback plugin or writing your own custom callback plugin. """ def __init__(self, *args, **kwargs): super(ResultsCollectorJSONCallback, self).__init__(*args, **kwargs) self.host_ok = {} self.host_unreachable = {} self.host_failed = {} def v2_runner_on_unreachable(self, result): host = result._host self.host_unreachable[host.get_name()] = result def v2_runner_on_ok(self, result, *args, **kwargs): """Print a json representation of the result. Also, store the result in an instance attribute for retrieval later """ host = result._host self.host_ok[host.get_name()] = result print(json.dumps({host.name: result._result}, indent=4)) def v2_runner_on_failed(self, result, *args, **kwargs): host = result._host self.host_failed[host.get_name()] = result def do_ansible(host_list,tasks): # since the API is constructed for CLI it expects certain options to always be set in the context object context.CLIARGS = ImmutableDict(connection='smart', module_path=['/to/mymodules', '/usr/share/ansible'], forks=10, become=None, become_method=None, become_user=None, check=False, diff=False, verbosity=0) # required for # https://github.com/ansible/ansible/blob/devel/lib/ansible/inventory/manager.py#L204 sources = ','.join(host_list)#定义主机列表 if len(host_list) == 1: sources += ',' # initialize needed objects loader = DataLoader() # Takes care of finding and reading yaml, json and ini files passwords = dict(vault_pass='secret') # Instantiate our ResultsCollectorJSONCallback for handling results as they come in. Ansible expects this to be one of its main display outlets results_callback = ResultsCollectorJSONCallback()#初始化回调函数 # create inventory, use path to host config file as source or hosts in a comma separated string inventory = InventoryManager(loader=loader, sources=sources)#创建主机清单 # variable manager takes care of merging all the different sources to give you a unified view of variables available in each context variable_manager = VariableManager(loader=loader, inventory=inventory)#变量管理器 # instantiate task queue manager, which takes care of forking and setting up all objects to iterate over host list and tasks # IMPORTANT: This also adds library dirs paths to the module loader # IMPORTANT: and so it must be initialized before calling `Play.load()`. #初始化任务队列管理器 tqm = TaskQueueManager( inventory=inventory, variable_manager=variable_manager, loader=loader, passwords=passwords, stdout_callback=results_callback, # Use our custom callback instead of the ``default`` callback plugin, which prints to stdout ) # create data structure that represents our play, including tasks, this is basically what our YAML loader does internally. #创建剧本的数据结构 play_source = dict( name="Ansible Play", hosts=host_list, gather_facts='no', tasks=tasks ) # Create play object, playbook objects use .load instead of init or new methods, # this will also automatically create the task objects from the info provided in play_source #创建剧本对象 play = Play().load(play_source, variable_manager=variable_manager, loader=loader) # Actually run it #实际运行剧本 try: result = tqm.run(play) # most interesting data for a play is actually sent to the callback's methods finally: # we always need to cleanup child procs and the structures we use to communicate with them tqm.cleanup() if loader: loader.cleanup_all_tmp_files() # Remove ansible tmpdir shutil.rmtree(C.DEFAULT_LOCAL_TMP, True) print("UP ***********") for host, result in results_callback.host_ok.items(): print('{0} >>> {1}'.format(host, result._result['stdout'])) print("FAILED *******") for host, result in results_callback.host_failed.items(): print('{0} >>> {1}'.format(host, result._result['msg'])) print("DOWN *********") for host, result in results_callback.host_unreachable.items(): print('{0} >>> {1}'.format(host, result._result['msg'])) if __name__ == '__main__': host_list = ['localhost', '119.3.251.91', '119.3.212.154'] room306 = ['114.116.241.141'] tasks=[ dict(action=dict(module='shell', args="date +'%Y-%m-%d %T'"), register='shell_out'), #dict(action=dict(module='debug', args=dict(msg='{{shell_out.stdout}}'))), #dict(action=dict(module='command',args=dict(cmd='/usr/bin/uptime'))), ] tasks_close=[ dict(action=dict(module='shell', args='shutdown -h now'), register='shell_out'), ] #do_ansible(host_list,tasks) do_ansible(room306,tasks)