import pymysql
import os,sys
from sshtunnel import SSHTunnelForwarder
sys.path.append(os.path.join(os.getcwd()))
import readConfig
from common.log import logger

localReadConfig = readConfig.ReadConfig()
print(localReadConfig.get_mysql("username"))

class DataBaseHandle(object):
    def __init__(self):
        self.server = self.get_server()
        self.server.start()
        self.db = pymysql.connect(host="127.0.0.1",user=localReadConfig.get_mysql("username"),password=localReadConfig.get_mysql("password"),database=localReadConfig.get_mysql("database"),port=self.server.local_bind_port,charset='utf8')
        self.cursor = None

    def get_server(self):
        ssh_address = localReadConfig.get_ssh("host") ##服务器地址
        ssh_port = int(localReadConfig.get_ssh("port"))
        server_user = localReadConfig.get_ssh("username") ##登录服务器的用户
        server_password = localReadConfig.get_ssh("password") ##登录服务器的密码
        server = SSHTunnelForwarder(
            ssh_address_or_host = (ssh_address, ssh_port),
            ssh_password=server_password,
            ssh_username=server_user,
            remote_bind_address=(localReadConfig.get_mysql("host"),int(localReadConfig.get_mysql("port"))),
            local_bind_address=("0.0.0.0", 9527) #绑定到本地的端口
            )
        return server  

    def execute_sql(self, sql):
        try:
            self.cursor = self.db.cursor()
            self.cursor.execute(sql)
            self.db.commit()
        except Exception as err:
            self.db.rollback()

    def query_one(self, sql):
        try:
            self.cursor = self.db.cursor()
            self.cursor.execute(sql)
            data = self.cursor.fetchone()
            return data
        except Exception as err:
            logger.error(err)
    
    def query_all(self, sql):
        try:
            self.cursor = self.db.cursor()
            self.cursor.execute(sql)
            datas = self.cursor.fetchall()
            return datas
        except Exception as err:
            logger.error(err)

    def closedb(self):
        """关闭数据库连接"""
        if self.cursor: self.cursor.close() 
        if self.db: self.db.close()
        self.server.stop()

if __name__ == "__main__":
    net = DataBaseHandle()
    s = net.query_one("select * from xxx")
    print(s)
    net.closedb()
Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐