一、数据准备:配置数据库、创建数据库引擎、创建基类、创建session
from sqlalchemy import create_engine, Column, Integer, ForeignKey, String, TEXT, Boolean, DATE, DECIMAL
from sqlalchemy.ext.declarative import declarative_base
from datetime import date
from sqlalchemy.orm import sessionmaker,relationship,backref
#配置数据库
HOSTNAME = '127.0.0.1'
PORT = '3306'
DATABASE = 'test'
USERNAME = 'root'
PASSWORD = 'root'
DB_URI = 'mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8mb4'.format(USERNAME, PASSWORD, HOSTNAME, PORT, DATABASE)
#创建数据库引擎
engine=create_engine(DB_URI)
#创建基类,所有的模型类继承基类
Base=declarative_base(engine)
#创建session
session=sessionmaker(engine)()
二、创建模型类(部门表和员工表)
#创建模型类
class Dept(Base):
__tablename__='t_dept2'
dept_no=Column(name='dept_no',type_=Integer,primary_key=True,autoincrement=True)
dept_name=Column(name='dept_name',type_=String(20))
city=Column(name='city',type_=String(20))
#代表当前部门下的所有员工列表,这种写法不是最好的,最优的写法只要在一个对象中关联就可以了
#emp=relationship('Emp') #参数必须是另一个关联模型类的类名
def __str__(self):
return f'部门编号:{self.dept_no}部门:{self.dept_name}城市:{self.city}'
class Emp(Base):
__tablename__='t_emp2'
emp_no=Column(name='emp_no',type_=Integer,primary_key=True,autoincrement=True)
emp_name=Column(name='emp_name',type_=String(20))
hire_date=Column(name='hire_date',type_=DATE)
sal=Column(name='sal',type_=DECIMAL(10,2))
#todo 设置外键关联,在从表中增加一个字段,指定这个字段外键的是哪个表的哪个字段就可以了。从表中外键的字段,必须和主表的主键字段类型保持一致。
dept_no=Column(ForeignKey('t_dept2.dept_no',ondelete='CASCADE'),name='dept_no',type_=Integer)
#代表员工所属的部门信息,backref:反向关联的属性名,lazy='dynamic:懒加载
dept=relationship('Dept',backref=backref('emps',lazy='dynamic')) #参数必须是另一个关联模型类的类名
def __str__(self):
return f'员工编号:{self.emp_no}员工姓名:{self.emp_name}员工入职时间:{self.hire_date}员工薪资:{self.sal}'
#根据模型类建表
#Base.metadata.create_all()
#Base.metadata.drop_all()
group_by
根据某个字段进行分组
having
having是对分组查找结果作进一步过滤
案例
def test_group():
# 统计每个工资级别下有多少员工
result=session.query(Emp.sal,func.count(Emp.emp_no)).group_by(Emp.sal).all()
print(result)
#统计每个部门下有多少员工
result1=session.query(Emp.dept_no,func.count(Emp.emp_no)).group_by(Emp.dept_no).all()
print(result1)
# 统计每个工资级别下有多少员工,只统计7000以上的
result2 = session.query(Emp.sal, func.count(Emp.emp_no)).group_by(Emp.sal).having(Emp.sal>7000).all()
print(result2)
执行结果:
[(Decimal('8888.88'), 1), (Decimal('5555.88'), 1), (Decimal('6000.00'), 1), (Decimal('10000.00'), 2), (Decimal('7777.77'), 1), (Decimal('8000.00'), 1)]
[(1, 5), (2, 2)]
[(Decimal('8888.88'), 1), (Decimal('10000.00'), 2), (Decimal('7777.77'), 1), (Decimal('8000.00'), 1)]
子查询
子查询即select语句中还有select。
那么在sqlalchemy中,要实现一个子查询,需以下几个步骤:
1. 将子查询按照传统的方式写好查询代码,然后在query对象后面执行subquery方法,将这个查询变成一个子查询。
2. 在子查询中,将以后需要用到的字段通过label方法,取个别名。
3. 在父查询中,如果想要使用子查询的字段,那么可以通过子查询的返回值上的c属性拿到(c=Column)。
案例
def test_subquery():
#查询和django这个员工,入职时间、工资都相同的其他员工
#第一步:写子查询
result3=session.query(Emp.hire_date.label('h_d'),Emp.sal.label('sal')).filter(Emp.emp_name=='django').subquery()
print(result3)
#第二步:写父查询
result4=session.query(Emp).filter(Emp.hire_date==result3.c.h_d,Emp.sal==result3.c.sal).all()
print(result4) #查看sql语句
for i in result4:
print(i)
SELECT t_emp2.hire_date AS h_d, t_emp2.sal AS sal
FROM t_emp2
WHERE t_emp2.emp_name = %(emp_name_1)s
[<__main__.Emp object at 0x000001BDEDEEBAF0>, <__main__.Emp object at 0x000001BDEDEEBB50>]
员工编号:6员工姓名:django员工入职时间:2021-12-12员工薪资:10000.00
员工编号:13员工姓名:mysql员工入职时间:2021-12-12员工薪资:10000.00