This commit is contained in:
userName 2024-08-25 04:20:21 +00:00
commit ba8e5ee95a
34 changed files with 790 additions and 0 deletions

9
README.md Normal file
View File

@ -0,0 +1,9 @@
alembic init alembic
[sqlalchemy]
url = postgresql+psycopg2://user:password@localhost/dbname
import sys
import os.path
sys.path.append(os.path.realpath('../addons/base/models'))
import ir_module_module
target_metadata = ir_module_module.Base.metadata

0
__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

15
addons/__init__.py Normal file
View File

@ -0,0 +1,15 @@
import pkgutil
import os.path
__path__ = [
os.path.abspath(path)
for path in pkgutil.extend_path(__path__, __name__)
]
#import pkgutil: 导入 Python 的标准库模块 pkgutil用于处理包。
# import os.path: 导入 os.path 模块,用于处理文件路径。
# 路径扩展:
# pkgutil.extend_path(__path__, __name__): 这个方法会根据当前模块的 __path__ 和模块名称 (__name__) 扩展出更多的路径。这些路径通常是指向包内的其他文件或子目录。
# 转换为绝对路径:
# 使用列表推导式遍历 pkgutil.extend_path(__path__, __name__) 返回的所有路径,并通过 os.path.abspath(path) 转换为绝对路径。
# 最终结果被赋值给 __path__这将覆盖原有的 __path__ 值。

Binary file not shown.

1
addons/base/__init__.py Normal file
View File

@ -0,0 +1 @@
from . import models

View File

@ -0,0 +1,7 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
{
'name': 'Check Printing Base',
'version': '1.0',
}

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1 @@
from . import ir_module_module

View File

@ -0,0 +1,86 @@
from sqlalchemy import Column, Integer, String, Enum
from sqlalchemy.ext.declarative import declarative_base
from enum import Enum as PythonEnum
from sqlalchemy.orm import Session
from fastapi import Depends, HTTPException, status, APIRouter
from server.database.database import get_db
import os
app = APIRouter()
# 创建ORM基类
Base = declarative_base()
# 定义枚举类型
class EnumState(PythonEnum):
installable = 'installable'
to_upgrade = 'to_upgrade'
to_remove = 'to_remove'
installed = 'installed'
uninstallable = 'uninstallable'
# 定义SQLAlchemy模型
class IrModuleModule(Base):
__tablename__ = "ir.module.module"
id = Column(Integer, primary_key=True, index=True, doc="主键")
name = Column(String, doc="技术名称")
state = Column(Enum(*[member.value for member in EnumState], name='module_state'), nullable=False, server_default=EnumState.uninstallable.value)
from pydantic import BaseModel, Field
from enum import Enum
# 定义枚举类型
class EnumState(Enum):
installable = 'installable'
to_upgrade = 'to_upgrade'
to_remove = 'to_remove'
installed = 'installed'
uninstallable = 'uninstallable'
uninstalled = 'uninstalled'
to_install = 'to_install'
class PydanticIrModuleModule(BaseModel):
name: str | None = None
state: EnumState = EnumState.uninstallable # 设置默认值
@app.post("/create")
async def create_admin(admin: PydanticIrModuleModule, db: Session = Depends(get_db)):
new_admin = IrModuleModule(
name=admin.name,
)
db.add(new_admin)
db.commit()
db.refresh(new_admin)
return {"message": "hr_admin created successfully"}
@app.get("/search", response_model=list[PydanticIrModuleModule])
async def search_all(db: Session = Depends(get_db)):
admin = db.query(IrModuleModule).all()
return admin
def process_addons_folder(folder_path):
addons_path = os.path.join(folder_path, 'addons')
# 确保 addons 文件夹存在
if not os.path.exists(addons_path):
print(f"Addons folder not found at {addons_path}")
return
# 遍历 addons 文件夹中的所有文件夹
for addon_dir in os.listdir(addons_path):
addon_path = os.path.join(addons_path, addon_dir)
# 只处理文件夹
if os.path.isdir(addon_path):
init_file_path = os.path.join(addon_path, '__init__.py')
# 如果存在 __init__.py 文件
if os.path.exists(init_file_path):
# 添加 import 语句
with open(init_file_path, 'a') as f:
f.write('\nfrom . import ir_module_module\n')
# 如果模块被标记为激活状态
if check_module_activation(addon_dir):
create_table_in_db(addon_dir)

View File

@ -0,0 +1,28 @@
from passlib.context import CryptContext
import configparser
import os
import logging
def get_password_hash(password):
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
return pwd_context.hash(password)
# 初始化配置文件
def read_conf():
logger = logging.getLogger(__name__)
if os.path.exists("./server/server.conf"):
config_parser = configparser.ConfigParser()
config_parser.read("./server/server.conf")
with open("./server/server.conf", "r") as f:
lines = f.readlines()
comments = [line for line in lines if line.strip().startswith("#")]
config_parser.set("options", "admin_passwd", get_password_hash('admin'))
current_file_path = os.path.abspath(__file__)
current_directory = os.path.dirname(current_file_path)
addons_path = os.path.join(current_directory, 'addons,')
logfile_path = os.path.join(current_directory, 'database/server.log')
config_parser.set("options", "addons_path", addons_path)
config_parser.set("options", "log_level", ':INFO')
config_parser.set("options", "logfile", logfile_path)
with open("./server/server.conf", "w") as configfile:
configfile.writelines(comments)
config_parser.write(configfile)
logger.info("配置文件模块执行完毕")

0
database/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

116
database/alembic.ini Normal file
View File

@ -0,0 +1,116 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
# Use forward slashes (/) also on windows to provide an os agnostic path
script_location = alembic
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python>=3.9 or backports.zoneinfo library.
# Any required deps can installed by adding `alembic[tz]` to the pip requirements
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =
# max length of characters to apply to the "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to alembic/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions
# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = postgresql+psycopg2://fastapi:Sj89061189@localhost/fastapi
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the exec runner, execute a binary
# hooks = ruff
# ruff.type = exec
# ruff.executable = %(here)s/.venv/bin/ruff
# ruff.options = --fix REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

1
database/alembic/README Normal file
View File

@ -0,0 +1 @@
Generic single-database configuration.

Binary file not shown.

83
database/alembic/env.py Normal file
View File

@ -0,0 +1,83 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = None
import sys
import os.path
sys.path.append(os.path.realpath('../addons/base/models'))
import ir_module_module
target_metadata = ir_module_module.Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@ -0,0 +1,26 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,56 @@
"""Initial migration
Revision ID: df08377a186d
Revises:
Create Date: 2024-08-12 09:44:54.105915
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'df08377a186d'
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index('ix_admin_id', table_name='admin')
op.drop_index('ix_admin_name', table_name='admin')
op.drop_index('ix_admin_username', table_name='admin')
op.drop_table('admin')
op.drop_index('ix_products_id', table_name='products')
op.drop_index('ix_products_name', table_name='products')
op.drop_table('products')
op.drop_index('ix_ir.module.module_state', table_name='ir.module.module')
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_index('ix_ir.module.module_state', 'ir.module.module', ['state'], unique=False)
op.create_table('products',
sa.Column('id', sa.INTEGER(), autoincrement=True, nullable=False),
sa.Column('name', sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column('described', sa.VARCHAR(), autoincrement=False, nullable=True),
sa.PrimaryKeyConstraint('id', name='products_pkey')
)
op.create_index('ix_products_name', 'products', ['name'], unique=True)
op.create_index('ix_products_id', 'products', ['id'], unique=False)
op.create_table('admin',
sa.Column('id', sa.INTEGER(), autoincrement=True, nullable=False),
sa.Column('name', sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column('username', sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column('password', sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column('is_superuser', sa.BOOLEAN(), autoincrement=False, nullable=False),
sa.PrimaryKeyConstraint('id', name='admin_pkey')
)
op.create_index('ix_admin_username', 'admin', ['username'], unique=True)
op.create_index('ix_admin_name', 'admin', ['name'], unique=False)
op.create_index('ix_admin_id', 'admin', ['id'], unique=False)
# ### end Alembic commands ###

12
database/alembic_1.py Normal file
View File

@ -0,0 +1,12 @@
import subprocess
def run_service():
bash_cmd = f"""
source /home/lqs1/app/venv/bin/activate && \
cd /home/lqs1/app/server/database && \
alembic revision -m "Initial migration" --autogenerate
"""
subprocess.Popen(bash_cmd, shell=True, executable='/bin/bash')
if __name__ == '__main__':
run_service()

19
database/alembic_2.py Normal file
View File

@ -0,0 +1,19 @@
import subprocess
def run_service():
# 激活虚拟环境并运行服务
# 注意这里的命令被组合成了一个bash脚本字符串
# 首先激活虚拟环境然后改变目录最后执行alembic命令
bash_cmd = f"""
source /home/lqs1/app/venv/bin/activate && \
cd /home/lqs1/app/server/database && \
alembic upgrade head
"""
# 使用 Popen 执行bash命令
# 注意这里使用shell=True因为我们正在执行一个bash脚本
# executable='/bin/bash' 是可选的因为默认就是bash但明确指出也无妨
subprocess.Popen(bash_cmd, shell=True, executable='/bin/bash')
if __name__ == '__main__':
run_service()

37
database/database.py Normal file
View File

@ -0,0 +1,37 @@
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
import configparser
import logging
logger = logging.getLogger(__name__)
config_parser = configparser.ConfigParser()
config_parser.read("./server/server.conf")
host = config_parser.get('options', 'db_host')
port = config_parser.getint('options', 'db_port')
username = config_parser.get('options', 'db_user')
password = config_parser.get('options', 'db_password')
database = config_parser.get('options', 'db_name')
sqlname = config_parser.get('options', 'db_sqlname')
DATABASE_URL = f"{sqlname}://{username}:{password}@{host}:{port}/{database}"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
#获取数据库连接
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
logger.info("数据库关闭")
# 检查数据库连接
def check_database_connection():
try:
with SessionLocal() as db:
db.execute(text("SELECT 1"))
logger.info("数据库连接测试执行完毕")
return True
except Exception as e:
logger.info(f"无法连接到数据库!错误: {e}")
return False

88
main.py Normal file
View File

@ -0,0 +1,88 @@
from fastapi import FastAPI, Depends, HTTPException, status
import logging
from .database.database import engine
from .addons.base.models.ir_module_module import Base
from sqlalchemy.orm import Session
# 在应用启动时创建表
Base.metadata.create_all(bind=engine)
app = FastAPI()
#临时测试导入
from server.database.database import check_database_connection
from server.addons.base.models.res_config_settings import read_conf
from .database.database import get_db
from importlib import import_module
import os
from pathlib import Path
import os
import importlib.util
def init_server_conf():
logging.basicConfig(level=logging.DEBUG, format='%(message)s')
logging.info(f"fastapi执行完毕")
@app.get("/")
async def root():
read_conf()
check_database_connection()
init_server_conf()
# app/main.py
import pkgutil
import importlib
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from pathlib import Path
# 数据库配置
DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# 自动加载模型
def load_models(directory):
package_name = directory.name
for _, module_name, _ in pkgutil.iter_modules([str(directory)]):
full_module_name = f"{package_name}.{module_name}"
importlib.import_module(full_module_name)
# 加载所有模型
models_directory = Path(__file__).parent / "models"
load_models(models_directory)
# 创建数据库表
Base.metadata.create_all(bind=engine)
from fastapi import FastAPI
import pkgutil
import importlib
from pathlib import Path
# 自动加载模块
def load_modules(directory):
package_name = directory.name
for _, module_name, _ in pkgutil.iter_modules([str(directory)]):
full_module_name = f"{package_name}.{module_name}"
importlib.import_module(full_module_name)
# 加载所有模块
modules_directory = Path(__file__).parent / "modules"
load_modules(modules_directory)
# 添加路由
for _, module_name, _ in pkgutil.iter_modules([str(modules_directory)]):
module = importlib.import_module(f"app.modules.{module_name}.routes")
if hasattr(module, "router"):
app.include_router(module.router)

36
requirements.txt Normal file
View File

@ -0,0 +1,36 @@
# requirements.txt
# pip install -r requirements.txt
# server框架
fastapi
# 数据验证
pydantic
# 异步客户端
asyncpg
# SQLAlchemy支持ORM
sqlalchemy
#http编码库
python-multipart
#server
uvicorn[standard]
#pg数据库引擎
psycopg2-binary
#哈希算法验证
passlib
#加密
pyJWT
#数据库迁移
alembic
#配置文件的增删改查
configparser

136
server.conf Normal file
View File

@ -0,0 +1,136 @@
# addons_path: 指定 server 模块的路径,可以是多个路径,用逗号分隔。
# admin_passwd: server 管理员的密码,这里使用了 passlib 库的哈希格式存储。
# bin_path: 指定二进制文件的路径。
# csv_internal_sep: CSV 文件内部字段分隔符,默认为空。
# data_dir: 存储数据的目录,默认为用户的 .local/share/server 目录。
# db_host: 数据库主机地址,默认为 localhost。
# db_maxconn: 最大数据库连接数。
# db_name: 默认使用的数据库名称。
# db_password: 数据库密码。
# db_port: 数据库端口,默认未设置则使用默认端口。
# db_sslmode: 数据库 SSL 模式。
# db_template: 数据库模板。
# dbfilter: 数据库过滤器。
# default_productivity_apps: 默认启用的生产力应用。
# demo: 是否加载演示数据。
# email_from: 发送邮件的发件人地址。
# from_filter: 邮件过滤器。
# geoip_database: GeoIP 数据库的位置。
# gevent_port: gevent 服务端口。
# http_enable: 是否启用 HTTP 服务。
# http_interface: HTTP 服务监听的接口。
# http_port: HTTP 服务监听的端口。
# import_partial: 是否允许部分导入。
# limit_memory_hard: 内存限制(硬)。
# limit_memory_soft: 内存限制(软)。
# limit_request: 单个请求的最大大小。
# limit_time_cpu: 单个请求 CPU 时间限制。
# limit_time_real: 单个请求实际时间限制。
# limit_time_real_cron: cron 任务的实际时间限制。
# list_db: 是否列出所有可用的数据库。
# log_db: 是否记录数据库操作。
# log_db_level: 数据库日志级别。
# log_handler: 日志处理器。
# log_level: 日志级别。
# logfile: 日志文件路径。
# max_cron_threads: 最大 cron 任务线程数。
# osv_memory_age_limit: 对象存储的年龄限制。
# osv_memory_count_limit: 对象存储的数量限制。
# pg_path: PostgreSQL 客户端工具路径。
# pidfile: 进程 ID 文件路径。
# proxy_mode: 是否启用代理模式。
# reportgz: 是否压缩报告。
# screencasts: 是否记录屏幕录制。
# screenshots: 是否记录屏幕截图。
# server_wide_modules: 全局服务器模块。
# smtp_password: SMTP 服务器密码。
# smtp_port: SMTP 服务器端口。
# smtp_server: SMTP 服务器地址。
# smtp_ssl: 是否使用 SSL 连接 SMTP 服务器。
# smtp_ssl_certificate_filename: SSL 证书文件名。
# smtp_ssl_private_key_filename: SSL 私钥文件名。
# smtp_user: SMTP 用户名。
# syslog: 是否记录到系统日志。
# test_enable: 是否启用测试模式。
# test_file: 测试文件路径。
# test_tags: 测试标签。
# transient_age_limit: 短暂对象的年龄限制。
# translate_modules: 要翻译的模块。
# unaccent: 是否去除重音。
# upgrade_path: 升级路径。
# websocket_keep_alive_timeout: WebSocket 保持活动超时。
# websocket_rate_limit_burst: WebSocket 速率限制突发次数。
# websocket_rate_limit_delay: WebSocket 速率限制延迟。
# without_demo: 是否不加载演示数据。
# workers: 工作进程数量。
# x_sendfile: 是否启用 X-Sendfile 支持。
[options]
addons_path = /home/lqs1/app/server/addons/base/models/addons,
admin_passwd = $2b$12$nBrsS0ZDjpJwLRM.h7Vg8uyBaSPXjKeo7tpLzl1V1Do.bGvsJztMq
bin_path =
csv_internal_sep =
data_dir =
db_host = localhost
db_maxconn =
db_sqlname = postgresql
db_name = fastapi
db_user = fastapi
db_password = Sj89061189
db_port = 5432
db_sslmode =
db_template =
dbfilter =
default_productivity_apps =
demo =
email_from =
from_filter =
geoip_database =
gevent_port =
http_enable =
http_interface =
http_port =
import_partial =
limit_memory_hard =
limit_memory_soft =
limit_request =
limit_time_cpu =
limit_time_real =
limit_time_real_cron =
list_db = True
log_db = False
log_db_level = warning
log_handler = :INFO
log_level = :INFO
logfile = /home/lqs1/app/server/addons/base/models/database/server.log
max_cron_threads =
osv_memory_age_limit =
osv_memory_count_limit =
pg_path =
pidfile =
proxy_mode =
reportgz =
screencasts =
screenshots =
server_wide_modules =
smtp_password =
smtp_port =
smtp_server =
smtp_ssl =
smtp_ssl_certificate_filename =
smtp_ssl_private_key_filename =
smtp_user =
syslog =
test_enable =
test_file =
test_tags =
transient_age_limit =
translate_modules =
unaccent =
upgrade_path =
websocket_keep_alive_timeout =
websocket_rate_limit_burst =
websocket_rate_limit_delay =
without_demo =
workers =
x_sendfile =

17
server_1.py Normal file
View File

@ -0,0 +1,17 @@
# start_service.py
import subprocess
def run_service():
# 激活虚拟环境
activate_cmd = '/home/lqs1/app/venv/bin/activate'
# 运行服务
service_cmd = ['uvicorn', 'server.main:app', '--reload']
# 构建完整的命令
full_cmd = f'source {activate_cmd} && cd /home/lqs1/app/ && uvicorn server.main:app --reload'
# 使用 Popen 执行命令
subprocess.Popen(full_cmd, shell=True, executable='/bin/bash')
if __name__ == '__main__':
run_service()

16
server_2.py Normal file
View File

@ -0,0 +1,16 @@
# stop_service.py
import subprocess
def stop_service():
# 查找 uvicorn 进程 ID
find_cmd = "ps aux | grep 'uvicorn server.main:app --reload' | grep -v grep | awk '{print $2}'"
process = subprocess.run(find_cmd, shell=True, capture_output=True, text=True)
pid = process.stdout.strip()
if pid:
# 终止进程
kill_cmd = f"kill {pid}"
subprocess.run(kill_cmd, shell=True)
if __name__ == '__main__':
stop_service()