该程序可见python_main.zzwqr.cn本人服务域名(别攻击!!!!)😕

倒卖者不得好死!

main.py“程序入口,服务器需使用gunicorn进行外网映射,可使用nginx反向代理”:

from flask import *
import os
from util import sql_command,System
import json
from add_pass import pass_key
import requests as rq
from sms import Sms


app=Flask(__name__)
main_key='密码'
main_salt = b"my salt"
app.secret_key = main_key
app.config['JSON_AS_ASCII'] = False
app.config['JSONIFY_MIMETYPE'] ="application/json;charset=utf-8"

config=json.loads(open('config.json','r').read())

#模块导入
sql_cmd=sql_command()
system=System()
sms=Sms()

key_pass=pass_key()

@app.route('/')
def index():
    
    '''global key_pass,main_key,main_salt
    encrypted_message = key_pass.encrypt(b"Hello, World!", main_key, main_salt)
    decrypted_message = key_pass.decrypt(encrypted_message, main_key, main_salt)
    return encrypted_message+decrypted_message'''
    return 'Hello'

@app.route('/admin_sms',methods=['POST'])
def admin_sms():
    global sms,sql_cmd,config
    data=request.get_json()
    data_sms={'token':data.get('token')}
    if data_sms['token']!=None:
        if sql.get_user_msg(sql_cmd.token_info(data_sms.get('token'))[id])['type']=='admin':
            n=sms.send_sms([config['admin_phone']])
            return n
        else:
            return jsonify({'return_code':'400','msg':'permissions error!'})
    else:
        return jsonify({'return_code':'500','msg':'Data error!'})

#可视化
@app.route('/admin',methods=['GET','POST'])
def admin():
    global system,sql_cmd
    if session.get('user')!=None:
        if sql_cmd.get_user_msg(session.get('id'))['type']=='admin':
            html=''
            with app.test_request_context():
                for rule in app.url_map.iter_rules():
                    html+="路由: "+str(rule.rule)
                    html+="视图函数: "+str(rule.endpoint)+'</br>'
        
            return render_template('admin.html',html_data=html)
        else:
            return jsonify({'return_code':'501','msg':'permissions error!'})
    else:
        return redirect(url_for('login'))
    
#系统操作
@app.route('/system')
def system_get():
    global system
    g=system.get_system_info()
    return '操作系统版本:'+g['操作系统版本']+',系统架构:'+g['系统架构']


@app.route('/login',methods=['GET','POST'])
def login():
    global sql_cmd
    if request.method=='POST':
        user=request.form.get('user')
        password=request.form.get('pwd')
        if user!=None and password!=None:
            if sql_cmd.have_user(user,password):
                data={'username':user,'password':password}
                token_data=rq.post('http://python_main.zzwqr.cn/token',json=data)
                session['user']=user
                session['id']=sql_cmd.get_id(user,password)
                session['token']=json.loads(token_data.text)['token']
                return json.loads(token_data.text)
            else:
                return jsonify({'return_code':'400','msg':'The user and password is fail!'})
        else:
            return jsonify({'return_code':'401','msg':'The form data is error!'})
    else:
        return render_template('login.html')
        

#API
#登录权限管理
@app.route('/token',methods=['POST'])
def token():
    global sql_cmd
    data=request.get_json()
    if data.get('username')!=None and data.get('password')!=None:
        token_data=sql_cmd.add_token(data.get('username'),data.get('password'))
        if token_data['state']=='200':
            token=token_data['token']
            return jsonify({'token':token,'state':'200'})
        else:
            return jsonify({'state':'400','msg':'Username or password fail!'})
    else:
        return jsonify({'state':'505','msg':'Not have data!'})


#文件操作
#获取所有文件
@app.route('/file',methods=['GET','POST'])
def file():
    global system,sql_cmd

    if request.method=='GET':
        if session.get('user')!=None:
            if sql_cmd.get_user_msg(session.get('id'))['type']=='admin':
                html=''
                for i in os.listdir('./file'):
                    html+='<a href="/file_get/'+i+'">'+i+'</a><br>'
                return html
            else:
                return jsonify({'return_code':'501','msg':'permissions error!'})
        else:
            return redirect(url_for('login'))
    else:
        j=request.get_json()
        if j.get('token')!=None:
            token=j.get('token')
            if sql_cmd.token_info(token)['state']=='200':
                n=sql_cmd.token_info(j.get('token'))['id']
                if sql_cmd.get_user_msg(n)['type']=='admin':
                    return jsonify(os.listdir('./file'))
                else:
                    return jsonify({'return_code':'406','msg':'permissions error!'})
            else:
                return jsonify({'return_code':'444','msg':'token error!'})
        else:
            return jsonify({'return_code':'400','msg':'No have data!'})
        
#下载文件
@app.route('/file_get/<filename>')
def return_file(filename):
    try:
        return send_from_directory('./file', filename, as_attachment=True)
    except:
        return 'Down error!'
#上传文件
@app.route('/upload', methods=['POST'])
def upload_file():
    # 检查是否有文件在请求中
    if 'file' not in request.files:
        return "No file part"
    file = request.files['file']
    # 如果用户没有选择文件,浏览器也会提交一个没有文件名的空部分
    if file.filename == '':
        return "No selected file"
    # 如果文件名存在,保存文件
    file.save('uploads/'+file.filename)
    return "File uploaded successfully"



#数据库操作类
@app.route('/user/get_all_user',methods=['POST'])
def all_user():
    global sms,sql_cmd,config
    data=request.get_json()
    if data.get('token')!=None:
        if sql.get_user_msg(sql_cmd.token_info(data.get('token'))[id])['type']=='admin':
            html=''
            data={}
            for i in sql_cmd.get_all_user():
                html+="user_name:"+str(i[0])+",passowrd:"+str(i[1])+",id:"+str(i[2])+"<br>"
                data[str(i[0])]={"user_name":str(i[0]),"password":str(i[1]),"id":i[2]}
            return jsonify(data)
        else:
            return jsonify({'return_code':'400','msg':'permissions error!'})
    else:
        return jsonify({'return_code':'500','msg':'Data error!'})
    
@app.route('/user/new_user',methods=['POST'])
def add_new_user():
    global sql_cmd,main_key
    data=request.get_json()
    if data.get('username')!=None and data.get('password')!=None:
        if data.get('admin_password')!=None and data.get('admin_password')==main_key:
            sql_cmd.into_new_user(data['username'],data['password'],type='admin')
        else:
            sql_cmd.into_new_user(data['username'],data['password'])
        return jsonify({'return_code':'200'})
    else:
        return jsonify({'return_code':'400','message':'The data is wrone!'})




    
if __name__=='__main__':
    app.run(host='0.0.0.0',port='3456')

util.py“工具文件,主要是数据库操作类”:

import sys
sys.path.append('/www/server/pyporject_evn/python_site_venv/lib/python3.11/site-packages')
import pymysql
import random
import string
import time
import platform

class sql_command:
    def __init__(self):
        conn = pymysql.connect(
            host='127.0.0.1',    # 本机就写:localhost
            port=3306,                 # 要连接到的数据库端口号,MySQL是3306
            user='python',                # 数据库的用户名
            password='',            # 数据库的密码
            database='python',      # 要操作的数据库
        )
        self.cursor = conn.cursor()
        self.conn=conn
        
    def get_user_msg(self,id):
        sql=f'select * from user where id={id}'
        self.cursor.execute(sql)
        user=self.cursor.fetchall()[0]
        return {'user':user[0],'password':user[1],'id':user[2],'type':user[3]}
        
        
    def get_id(self,user,password):
        sql=f'select * from user where user="{user}" and password="{password}"'
        self.cursor.execute(sql)
        id=self.cursor.fetchall()[0][2]
        return id
    
    def generate_random_string(self,length=12):
        # 定义所有可能的字符,包括大小写字母和数字
        characters = string.ascii_letters + string.digits
        # 使用random.choices从characters中随机选择length个字符
        random_string = ''.join(random.choices(characters, k=length))
        return random_string
    
    def have_user(self,user,password):
        sql=f'select * from user where user="{user}" and password="{password}"'
        self.cursor.execute(sql)
        self.conn.commit()
        if len(self.cursor.fetchall())>=1:
            return True
        else:
            return False
        
    def add_token(self,user,password):    
        if self.have_user(user,password):
            token=self.generate_random_string()
            id=self.get_id(user,password)
            sql=f"INSERT INTO `token`(`token`, `user_name`, `id`, `date`) VALUES ('{token}','{user}','{id}','{str(int(time.time()))}')"
            self.cursor.execute(sql)
            self.conn.commit()
            return {'state':'200','token':token}
        else:
            return {'state':'400'}
        
    def get_all_user(self):
        sql='select * from user;'
        self.cursor.execute(sql)
        return self.cursor.fetchall()
    def into_new_user(self,username,password,type='user'):
        id=len(self.get_all_user())+1
        sql=f"INSERT INTO `user`(`user`, `password`, `id`, `type`) VALUES ('{username}','{password}','{id}','{type}')"
        print(sql)
        self.cursor.execute(sql)
        self.conn.commit()
        return {'state':'200'}
        
    def token_info(self,token):
        sql=f'select * from token where token="{token}"'
        self.cursor.execute(sql)
        self.conn.commit()
            # TODO: write code...
        z=self.cursor.fetchall()
        if len(z)>=1:
            hz=z[0]
            return {'state':'200','user_name':hz[1],'id':hz[2],'date':hz[3]}
        else:
            return {'state':'400','msg':'No have this token!'}
        
class System:
    def __init__(self):
        pass
    def get_system_info(self):
        release_info = platform.system()+platform.release()
        # 获取操作系统版本,如Ubuntu 20.04.1 LTS
        architecture = platform.machine()
        # 获取系统架构,如x86_64
        return {'操作系统版本':release_info,'系统架构':architecture}

sms.py"需要腾讯云验证码服务":

# -*- coding: utf-8 -*-
from tencentcloud.common import credential
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.sms.v20210111 import sms_client, models
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
import random
class Sms:
    def __init__(self):
        Id=""
        Skey=""
        cred = credential.Credential(Id,Skey)
        httpProfile = HttpProfile()
        httpProfile.reqMethod = "POST"  
        httpProfile.reqTimeout = 30    
        httpProfile.endpoint = "sms.tencentcloudapi.com"  
        clientProfile = ClientProfile()
        clientProfile.signMethod = "TC3-HMAC-SHA256"
        clientProfile.language = "en-US"
        clientProfile.httpProfile = httpProfile
        client = sms_client.SmsClient(cred, "ap-guangzhou", clientProfile)
        self.client=client
    def send_sms(self,PhoneNumber,temp=[random.randint(100000,999999)]):
        req = models.SendSmsRequest()
        req.SmsSdkAppId = ""
        req.SignName = ""
        req.TemplateId = ""
        req.TemplateParamSet = ["598626"]
        req.PhoneNumberSet = ["",'']
        req.SessionContext = ""
        req.ExtendCode = ""
        req.SenderId = ""
        resp = self.client.SendSms(req)
        print(resp.to_json_string(indent=2))
        return resp.to_json_string(indent=2)

add_pass.py 加密文件(后期使用)

from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.ciphers import (Cipher, algorithms, modes)
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes
import os
class pass_key:
    def generate_key(self,password, salt, iterations=100000):
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=salt,
            iterations=iterations,
        )
        key = kdf.derive(password)
        return key
     
    def encrypt(self,message, password, salt, iterations=100000, key_length=32):
        key = self.generate_key(password, salt, iterations)
        fer = Fernet(key)
        encrypted_message = fer.encrypt(message.encode("utf-8"))
        return encrypted_message
     
    def decrypt(self,encrypted_message, password, salt, iterations=100000, key_length=32):
        key = self.generate_key(password, salt, iterations)
        fer = Fernet(key)
        decrypted_message = fer.decrypt(encrypted_message)
        return decrypted_message