数据库CRUD
一、安装lessweb-commondao
pip3 install lessweb-commondao
二、创建空白项目
mkdir schema
mkdir myapp
touch myapp/__init__.py
mkdir myapp/endpoint
touch myapp/endpoint/__init__.py
touch index.py
touch config.toml
index.py的内容为:
from lessweb.bridge import Bridge
from commondao.mapper import mysql_startup, mysql_cleanup, mysql_connect
def start_server():
bridge = Bridge('config.toml')
bridge.add_mod_ctx(mysql_startup, mysql_cleanup)
bridge.add_middleware(mysql_connect)
bridge.add_route_scan('myapp.endpoint')
bridge.run_app()
if __name__ == '__main__':
start_server()
config.toml的内容为:
[bootstrap]
port = 8080
[mysql]
pool_recycle = 1800
host = '???.??.??.???'
port = 3306
user = '?????'
password = '?????????'
db = '?????'
echo = false
autocommit = true
三、生成mapper.py
- 创建schema目录
- 创建schema/pet.sql文件
- 执行
commondao codegen --output myapp/mapper.py
schema/pet.sql内容为:
CREATE TABLE `tbl_pet` (
`pet_id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(100) DEFAULT NULL,
`kind` enum('CAT','DOG','OTHER') DEFAULT NULL,
`create_time` datetime DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`pet_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
四、实现endpoint
endpoint/create_pet.py
:
from lessweb.bridge import post_mapping
from myapp.mapper import CommonDao
@post_mapping('/pet')
async def create_pet(pet: dict, / , common_dao: CommonDao):
await common_dao.insert_pet(**pet)
return {}
endpoint/get_pet_detail.py
:
from lessweb.bridge import get_mapping
from myapp.mapper import CommonDao
@get_mapping('/pet/{pet_id}')
async def get_pet_detail(common_dao: CommonDao, * , pet_id: int):
return await common_dao.select_pet_by_id(pet_id=pet_id)
endpoint/get_pet_total.py
:
from lessweb.bridge import get_mapping
from commondao.mapper import Mapper
@get_mapping('/pet-total')
async def get_pet_total(mapper: Mapper, kind: str):
return await mapper.select_all('select count(*) as total, kind from tbl_pet group by kind where kind=:kind',
{'kind': kind}) # 执行手写SQL,这是防止SQL注入的正确方法
运行python3 index.py
即可访问服务。
五、分页查询
from aiohttp.web import Request
from lessweb.bridge import get_mapping
from myapp.mapper import CommonDao, QueryResult
@get_mapping('/pet')
async def get_pets(common_dao: CommonDao, request: Request)->QueryResult:
return await common_dao.select_pet(request.query)
分页查询返回结果的字段为:
@dataclass
class QueryResult:
total: int
list: List[dict]
limit: Optional[int] = None
offset: Optional[int] = None
limit
和offset
同时也是请求的保留字,定义也跟MySQL语法保持一致,需要关注如果前端传的limit
过大所产生的问题。如果不传limit
或者limit
传0,相当于只查询总数,框架会进行优化,只查询一次数据库。
你还可以用order=<field1>,-<field2>
的格式,指定一个或多个字段作为排序字段。注意字段名前面带-
表明是desc
排序,不带-
则是asc
排序。多个字段名用逗号分割。
查询时最重要的功能是指定各个字段的filter,以下图的查询需求为例:
请求的query应该是:
kind=CAT&name.filter=contains&name=kitty&create_time.filter=between&create_time=2023-01-01%2000%3A00%3A00&create_time.end=2023-01-31%2023%3A59%3A59
综合运用这些查询语法,如果希望每页显示20条结果,查看第2页,并且基于create_time
和pet_id
两个字段倒序排序,则query可以是:
kind=CAT&...&create_time.end=2023-01-31%2023%3A59%3A59&limit=20&offset=20&order=-create_time,-pet_id
目前commondao自带的filter清单如下:
eq => `<field>`=:<field>.value # eq是默认的filter,可以省略
like => `<field>` like :<field>.value
between => `<field>` between :<field>.value and :<field>.end
ne => `<field>` <> :<field>.value
lt => `<field>` < :<field>.value
gt => `<field>` > :<field>.value
lte => `<field>` <= :<field>.value
gte => `<field>` >= :<field>.value
in => `<field>` in :<field>.value
需要注意,很多时候出于安全考虑,需要对前端可请求的参数进行限制,由后端拼接实际的query,例如刚才的get_pets
函数就可以改成这样:
@get_mapping('/pet')
async def get_pets(common_dao: CommonDao, *, offset: int, name: str=None):
query = {'limit': 20, 'offset': offset, 'order': '-create_time,-pet_id'}
if name:
query.update({'name.filter': 'contains', 'name': name})
return await common_dao.select_pet(query)