使用SQLAlchemy操作数据库表过程解析

时间:2021-05-22

需求场景:

使用sqlalchmy从现有的表中获取数据(不是自己建表)。百度了一下,网上都是使用sqlalchemy自己先创建表,然后导入数据表的模型类进行增删改查;现在不是自己建表,该如何操作呢?

操作方案

通过sqlalchmey执行原生的sql语句,增删改查的原生语句携带表名,就不需要导入数据表的模型类了。

使用的包:

SQLAlchemy (1.3.10) + mysql-connector-python (8.0.19)

提供以下干货:

  • 演示了向原生sql语句传递变量的用法 即动态执行sql语句 更加灵活
  • 通过执行原生的sql语句实现操作已有的表
  • 演示了sql语句根据多字段排序的方法等

DEMO

# -*- coding:utf-8 -*-from sqlalchemy import create_engine,MetaData,Table,existsfrom sqlalchemy.orm import sessionmaker, scoped_sessionfrom util.Log import Logfrom conf.parseConfig import parseConf# 从配置文件中获取mysql的配置信息host = parseConf.get_conf('MySQLInfo', 'host')port = parseConf.get_conf('MySQLInfo', 'port')dbname = parseConf.get_conf('MySQLInfo', 'dbname')usernm = parseConf.get_conf('MySQLInfo', 'usernm')passwd = parseConf.get_conf('MySQLInfo', 'passwd')engine_str = "mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}".format(usernm, passwd, host, port, dbname)class OpsMysql(object): def __init__(self, log=Log(__file__).getlog()): self.log = log self.session = None try: self.engine = create_engine( engine_str, max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1, # 多久之后对线程池中的线程进行一次连接的回收(重置) # echo=True, # 显示相应执行的 sql 指令 encoding='utf-8' ) SessionFactory = sessionmaker(bind=self.engine) self.session = scoped_session(SessionFactory) except Exception as e: self.log.error(str(e)) self.log.error("Connect {0}@{1}:{2} failed!".format(dbname, host, port)) def get_session(self): return self.session def getEngine(self): return self.engine def init_db(self, base): base.metadata.create_all(self.engine) def drop_db(self, base): base.metadata.drop_all(self.engine)if __name__ == "__main__": log = Log(__file__).getlog() tt = OpsMysql(log) session = tt.get_session() if session: # 通过执行原生的sql语句实现操作已有的表 # 此处演示了向原生sql语句传递变量的用法 即动态执行sql语句 更加灵活 mail_id = 1 res = session.execute('select * from tbl_mail_addr where mail_id='" + mail_id + "' and mail_tp="c"') res01 = res.fetchall() # 结果是列表 print(res01[0]) # (1, 'c', 1, 'XX@u163.com') mail_id = '1' mail_type = 'c' # 查询id为1,类型是c的邮箱信息,并按mail_tp降序,addr_no升序排列,限制查询数量100 sql = "select * from tbl_mail_addr where tbl_mail_addr.oper_flag='1' and tbl_mail_addr.mail_id='" + mail_id + "' and tbl_mail_addr.mail_tp='" + mail_type + "' order by tbl_mail_addr.mail_tp, tbl_mail_addr.addr_no ASC limit 100" result = session.execute(sql) value_list = result.fetchall() print(value_list) # [(1, 'c', 1, 'XX@163.com'), (1, 'c', 2, 'XX@qq.com), (1, 'c', 3, 'XX@qq.com')] session.close()

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

声明:本页内容来源网络,仅供用户参考;我单位不保证亦不表示资料全面及准确无误,也不保证亦不表示这些资料为最新信息,如因任何原因,本网内容或者用户因倚赖本网内容造成任何损失或损害,我单位将不会负任何法律责任。如涉及版权问题,请提交至online#300.cn邮箱联系删除。

相关文章