数据库CRUD

一、安装lessweb-commondao

pip3 install lessweb-commondao

二、创建空白项目

mkdir schema
mkdir myapp
touch myapp/__init__.py
mkdir myapp/endpoint
touch myapp/endpoint/__init__.py
touch index.py
touch config.toml

index.py的内容为:

from lessweb.bridge import Bridge
from commondao.mapper import mysql_startup, mysql_cleanup, mysql_connect

def start_server():
    bridge = Bridge('config.toml')
    bridge.add_mod_ctx(mysql_startup, mysql_cleanup)
    bridge.add_middleware(mysql_connect)
    bridge.add_route_scan('myapp.endpoint')
    bridge.run_app()

if __name__ == '__main__':
    start_server()

config.toml的内容为:

[bootstrap]
port = 8080

[mysql]
pool_recycle = 1800
host = '???.??.??.???'
port = 3306
user = '?????'
password = '?????????'
db = '?????'
echo = false
autocommit = true

三、生成mapper.py

  1. 创建schema目录
  2. 创建schema/pet.sql文件
  3. 执行commondao codegen --output myapp/mapper.py

schema/pet.sql内容为:

CREATE TABLE `tbl_pet` (
  `pet_id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(100) DEFAULT NULL,
  `kind` enum('CAT','DOG','OTHER') DEFAULT NULL,
  `create_time` datetime DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`pet_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

四、实现endpoint

endpoint/create_pet.py

from lessweb.bridge import post_mapping
from myapp.mapper import CommonDao

@post_mapping('/pet')
async def create_pet(pet: dict, / , common_dao: CommonDao):
    await common_dao.insert_pet(**pet)
    return {}

endpoint/get_pet_detail.py

from lessweb.bridge import get_mapping
from myapp.mapper import CommonDao

@get_mapping('/pet/{pet_id}')
async def get_pet_detail(common_dao: CommonDao, * , pet_id: int):
    return await common_dao.select_pet_by_id(pet_id=pet_id)

endpoint/get_pet_total.py

from lessweb.bridge import get_mapping
from commondao.mapper import Mapper

@get_mapping('/pet-total')
async def get_pet_total(mapper: Mapper, kind: str):
    return await mapper.select_all('select count(*) as total, kind from tbl_pet group by kind where kind=:kind',
        {'kind': kind})  # 执行手写SQL,这是防止SQL注入的正确方法

运行python3 index.py即可访问服务。

五、分页查询

from aiohttp.web import Request
from lessweb.bridge import get_mapping
from myapp.mapper import CommonDao, QueryResult

@get_mapping('/pet')
async def get_pets(common_dao: CommonDao, request: Request)->QueryResult:
    return await common_dao.select_pet(request.query)

分页查询返回结果的字段为:

@dataclass
class QueryResult:
    total: int
    list: List[dict]
    limit: Optional[int] = None
    offset: Optional[int] = None

limitoffset同时也是请求的保留字,定义也跟MySQL语法保持一致,需要关注如果前端传的limit过大所产生的问题。如果不传limit或者limit传0,相当于只查询总数,框架会进行优化,只查询一次数据库。

你还可以用order=<field1>,-<field2>的格式,指定一个或多个字段作为排序字段。注意字段名前面带- 表明是desc排序,不带-则是asc排序。多个字段名用逗号分割。

查询时最重要的功能是指定各个字段的filter,以下图的查询需求为例:

example1

请求的query应该是:

kind=CAT&name.filter=contains&name=kitty&create_time.filter=between&create_time=2023-01-01%2000%3A00%3A00&create_time.end=2023-01-31%2023%3A59%3A59

综合运用这些查询语法,如果希望每页显示20条结果,查看第2页,并且基于create_timepet_id两个字段倒序排序,则query可以是:

kind=CAT&...&create_time.end=2023-01-31%2023%3A59%3A59&limit=20&offset=20&order=-create_time,-pet_id

目前commondao自带的filter清单如下:

eq       => `<field>`=:<field>.value  # eq是默认的filter,可以省略
like     => `<field>` like :<field>.value
between  => `<field>` between :<field>.value and :<field>.end
ne       => `<field>` <> :<field>.value
lt       => `<field>` < :<field>.value
gt       => `<field>` > :<field>.value
lte      => `<field>` <= :<field>.value
gte      => `<field>` >= :<field>.value
in       => `<field>` in :<field>.value

需要注意,很多时候出于安全考虑,需要对前端可请求的参数进行限制,由后端拼接实际的query,例如刚才的get_pets函数就可以改成这样:

@get_mapping('/pet')
async def get_pets(common_dao: CommonDao, *, offset: int, name: str=None):
    query = {'limit': 20, 'offset': offset, 'order': '-create_time,-pet_id'}
    if name:
        query.update({'name.filter': 'contains', 'name': name})
    return await common_dao.select_pet(query)