# -*- coding: UTF-8 -*- # Author : LinYaoHong # Date : 2019/8/6 11:13 # connsqlalchemy.py # Tools : PyCharm from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import and_, or_ from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index Base = declarative_base() engine = create_engine("mysql+pymysql://root:111111@127.0.0.1/testdb?charset=utf8", max_overflow=5) Session = sessionmaker(bind=engine) session = Session() from connsqlalchemy import UserInfo,UserType ''' 查询 分组 排序 连表 通配符 子查询 limit union ''' def result(ret): # 定义取结果的函数(下面重复调用) for row in ret: print(row.id, row.name, row.age, row.user_type_id) print('==================================条件====================================') print('------------------------ name = 钢铁侠----------------------------') ret = session.query(UserInfo).filter_by(name="钢铁侠").all() result(ret) ret = session.query(UserInfo).filter_by(name="蚁人").first() print(ret.id,ret.name) ''' filter_by 接受的是参数 filter接受的条件表达式(filter_by内部其实是调用的filter) 通过打印SQL语句可以看出,执行的语句一样 ''' print('------------------------ name = 钢铁侠---------------------------') ret = session.query(UserInfo).filter(UserInfo.name == "钢铁侠").all() result(ret) print("------------------------ id< 5 and name = '奇异博士'") ret = session.query(UserInfo).filter(UserInfo.id < 5, UserInfo.name == "奇异博士").all() result(ret) print('------------------------id在 1和 4 之间-----------------------------------') ret = session.query(UserInfo).filter(UserInfo.id.between(1, 4)).all() result(ret) print('------------------------id在 1和 4 之间 and name = 钢铁侠-------------------------------') ret = session.query(UserInfo).filter(UserInfo.id.between(1, 4), UserInfo.name == "钢铁侠").all() result(ret) print('------------------------id在1,3,4里 -------------------------------') ret = session.query(UserInfo).filter(UserInfo.id.in_([1, 3, 4])).all() result(ret) print('------------------------id不在在1,3,4里 -------------------------------') ret = session.query(UserInfo).filter(~UserInfo.id.in_([1, 3, 4])).all() result(ret) print('------------------------id>3 and name =奇异博士 -------------------------------') ret = session.query(UserInfo).filter(and_(UserInfo.id > 3, UserInfo.name == "奇异博士")).all() result(ret) print('------------------------id>3 or name =奇异博士 -------------------------------') ret = session.query(UserInfo).filter(or_(UserInfo.id > 3, UserInfo.name == "奇异博士")).all() result(ret) print('------------------------or和and连起来使用-------------------------------') ret = session.query(UserInfo).filter( or_( UserInfo.id < 2, and_(UserInfo.name == "美国队长", UserInfo.id > 3), UserInfo.name != "蜘蛛侠" )).all() result(ret) print('==================================通配符====================================') ret = session.query(UserInfo).filter(UserInfo.name.like("钢%")).all() # 非钢开头 ret = session.query(UserInfo).filter(~UserInfo.name.like("钢%")).all() ret = session.query(UserInfo).filter(UserInfo.name.like("钢铁_")).all() print('==================================限制====================================') ret = session.query(UserInfo)[0:2] print('==================================排序====================================') ret =session.query(UserInfo).order_by(UserInfo.age.desc()).all() print('----------------------当第一列有重复的时候再指定按照第二个排序') ret =session.query(UserInfo).order_by(UserInfo.age.desc(),UserInfo.id.asc()).all() print('==================================分组====================================') from sqlalchemy.sql import func print('----------------------只去重分组-----------------------') ret = session.query(UserInfo).group_by(UserInfo.name).all() for row in ret: print(row.name ) print('--------------------分组去重并统计个数------------------') ret = session.query(UserInfo.name, func.count(UserInfo.id) ).group_by(UserInfo.name).all() print(ret) print('--------------------------------其他用法--------------------------------') ret = session.query( UserInfo.name, func.max(UserInfo.age), func.sum(UserInfo.age), func.min(UserInfo.age)).group_by(UserInfo.name).all() print(ret) print('--------------------------------其他用法2--------------------------------') ret = session.query( UserInfo.name, func.max(UserInfo.id), func.sum(UserInfo.id), func.min(UserInfo.id)).group_by(UserInfo.name).having(func.min(UserInfo.id) >5).all() print(ret) print('==================================连表====================================') '''#SELECT * from userinfo,usertype where userinfo.user_type_id=usertype.id 两种方式 推荐join SELECT userinfo.id AS userinfo_id, userinfo.name AS userinfo_name, userinfo.age AS userinfo_age, userinfo.user_type_id AS userinfo_user_type_id FROM userinfo LEFT OUTER JOIN usertype ON usertype.id = userinfo.user_type_id isouter=True 表示 LEFT JOIN 不加则是 JOIN ''' ret = session.query(UserInfo).join(UserType,isouter=True).all() print('==================================组合====================================') q1 = session.query(UserInfo.name).filter(UserInfo.id>2) q2 = session.query(UserType.user_type).filter(UserType.id>3) ret = q1.union(q2).all() # 去重 print(ret) ret = q1.union_all(q2).all() # 不去重 print(ret) print('=========================扩展子查询临时表===================') # select * from b where id in () ret = session.query(UserInfo).filter(UserInfo.id.in_( session.query(UserInfo.id).filter_by(name="钢铁侠"))).all() for row in ret: print(row.id, row.name, row.age, row.user_type_id) print('----------------------子查询--------------------------') # 子查询 # select * from (select * from tb) as B # subquery() 定义为子查询 q1 = session.query(UserInfo).filter(UserInfo.id >10).subquery() result = session.query(q1).all() print(result) print('----------------------子查询 笛卡尔积 --------------------------') # select id ,(select * from tb where user_type_id=1) from xxx; # 没测试成功,日你吗 # ret = session.query(UserType.id,session.query(UserInfo).filter(UserInfo.user_type_id==UserType.id).as_scalar()) # print(ret) session.commit() # 提交 session.close() # 关闭