from sqlalchemy import create_engine, Column, Integer, String from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from .configparser import DATABASE_URL import logging import psycopg2 from psycopg2 import sql from fastapi import APIRouter, Depends, HTTPException, status from pydantic import BaseModel, Field from ....main import app from sqlalchemy.orm import Session from pydantic import ValidationError router = APIRouter() from passlib.context import CryptContext logger = logging.getLogger(__name__) Base = declarative_base() #哈希功能 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def get_password_hash(password): return pwd_context.hash(password) class ResPartner(Base): __tablename__ = 'res.artner' id = Column(Integer, primary_key=True) password = Column(String, nullable=False) phone = Column(String, nullable=False) language = Column(String, nullable=False) region = Column(String, nullable=False) class ResPartnerValidationModel(BaseModel): masterPassword: str databaseName: str email: str password: str phoneNumber: str selectedLanguage: str selectedCountry: str selectedDemoData: str # masterPassword: masterPassword.value, # databaseName: databaseName.value, # email: email.value, # password: password.value, # phoneNumber: phoneNumber.value, # selectedLanguage: selectedLanguage.value, # selectedCountry: selectedCountry.value, # selectedDemoData: selectedDemoData.value, # 创建数据库引擎 engine = create_engine(DATABASE_URL + '?client_encoding=utf8') # 显式指定编码 SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) def get_db(): db = SessionLocal() try: yield db finally: db.close() logger.info("数据库连接关闭") @router.post("/manager/create", response_model=dict) def create_database(response_data_model: ResPartnerValidationModel, db: Session = Depends(get_db)): conn = None createdbname = response_data_model.databaseName try: conn = psycopg2.connect(DATABASE_URL) conn.autocommit = True # 设置自动提交模式 cursor = conn.cursor() # 使用 sql.SQL 和 sql.Identifier 安全地构建SQL语句 query = sql.SQL("CREATE DATABASE {}").format(sql.Identifier(createdbname)) cursor.execute(query) logger.info("创建数据库成功",{createdbname}) create_res = ResPartner( name=response_data_model.name, username=response_data_model.username, password=get_password_hash(response_data_model.password) ) db.add(create_res) db.commit() db.refresh(create_res) # masterPassword: str # databaseName: str # email: str # password: str # phoneNumber: str # selectedLanguage: str # selectedCountry: str # selectedDemoData: str #自动创建全部表 return {"message": "Database created successfully"} # except ValidationError as e: # raise HTTPException(status_code=422, detail=e.errors()) except Exception as e: logger.error("创建数据库失败: %s", e) finally: if conn is not None: conn.close() def create_teable(): Base.metadata.create_all(engine) logger.info("创建全部表成功") def setup_admin(): db = None try: db = SessionLocal() admin = ResPartner(password='Sj89061189', phone='13929931775', language='zh-CN', region='China') db.add(admin) db.commit() db.refresh(admin) logger.info("创建超级管理员成功: %s", admin.id) except Exception as e: logger.error("创建超级管理员失败: %s", e) if db is not None: db.rollback() finally: if db is not None: db.close()