用户在创建好数据仓库集群后使用psycopg2第三方库连接到集群,则可以使用Python访问DWS ,并进行数据表的各类操作。
连接集群前的准备
- DWS 集群已绑定弹性IP。
- 已获取DWS 集群的数据库管理员用户名和密码。
注意由于MD5算法已经被证实存在碰撞可能,已严禁将之用于密码校验算法。当前DWS 采用默认安全设计,默认禁止MD5算法的密码校验,可能导致开源客户端无法正常连接的问题。建议先检查一下数据库参数password_encryption_type参数是否为1,如果取值不为1,需要修改,修改方法参见《用户指南》的“[修改数据库参数](https://www.ctyun.cn/document/10014061/10047788)”章节;然后修改一次准备使用的数据库用户的密码。
说明当前DWS出于安全考虑,已经默认不再使用MD5存储密码摘要了,这将导致使用开源驱动或者客户端无法正常连接数据库。需要您调整一下密码策略后再创建一个新用户或者对老用户做一次密码修改,方可使用开源协议中的MD5认证算法。
数据库中是不会存储用户的密码原文,而是存储密码的HASH摘要,在密码校验时与客户端发来的密码摘要进行比对(中间会有加盐操作)。故当您改变了密码算法策略时,数据库也是无法还原您的密码,再生成新的HASH算法的摘要值的。必须您手动修改一次密码或者创建一个新用户,这时新的密码将会采用您设置的HASH算法进行摘要存储,用于下次连接认证。
已获取DWS 集群的公网访问地址,含IP地址和端口。具体请参见 [获取集群连接地址](https://www.ctyun.cn/document/10014061/10047782)。
已安装psycopg2第三方库。下载地址:[https://pypi.org/project/psycopg2/](https://pypi.org/project/psycopg2/ " "),安装部署操作请参见:[https://www.psycopg.org/install/](https://www.psycopg.org/install/ " ")。
说明
CentOS、Redhat等操作系统中使用yum命令安装,命令为:yum install python-psycopg2。
psycopg2的使用依赖于PostgreSQL的libpq动态库(32位的psycopg2需要对应32位的libpq;64位的psycopg2对应64位的libpq),Linux中可以依赖yum命令解决。在Windows系统使用psycopg2需要先安装libpq,主要方式有两种:
安装PostgreSQL,并配置libpq、ssl、crypto动态库位置到环境变量PATH中。
安装psqlodbc,使用PostgreSQL ODBC驱动携带的libpq、ssl、crypto动态库。
使用约束
由于psycopg2是基于PostgreSQL的客户端接口,它的功能DWS并不能完全支持。具体支持情况请见下表。
说明以下接口支持情况是基于Python 3.8.5及psycopg 2.9.1版本。
DWS对psycopg2主要接口支持情况
类名 | 功能描述 | 函数/成员变量 | 支持 | 备注 |
---|---|---|---|---|
connections | basic | cursor(name=None,cursor_factory=None,scrollable=None,withhold=False) | Y | - |
connections | basic | commit() | Y | - |
connections | basic | rollback() | Y | - |
connections | basic | close() | Y | - |
connections | Two-phase commit support methods | xid(format_id,gtrid,bqual) | Y | - |
connections | Two-phase commit support methods | tpc_begin(xid ) | Y | - |
connections | Two-phase commit support methods | tpc_prepare() | N | 内核不支持显式prepare transaction |
connections | Two-phase commit support methods | tpc_commit([xid ]) | Y | - |
connections | Two-phase commit support methods | tpc_rollback([xid ]) | Y | - |
connections | Two-phase commit support methods | tpc_recover() | Y | - |
connections | Two-phase commit support methods | closed | Y | - |
connections | Two-phase commit support methods | cancel() | Y | - |
connections | Two-phase commit support methods | reset() | N | 不支持DISCARD ALL |
connections | Two-phase commit support methods | dsn | Y | - |
connections | Transaction control methods and attributes. | set_session(isolation_level=None,readonly=None,deferrable=None,autocommit=None) | Y | 数据库不支持session中设置default_transaction_read_only |
connections | Transaction control methods and attributes. | autocommit | Y | - |
connections | Transaction control methods and attributes. | isolation_level | Y | - |
connections | Transaction control methods and attributes. | readonly | N | 数据库不支持session中设置default_transaction_read_only |
connections | Transaction control methods and attributes. | deferrable | Y | - |
connections | Transaction control methods and attributes. | set_isolation_level(level ) | Y | - |
connections | Transaction control methods and attributes. | encoding | Y | - |
connections | Transaction control methods and attributes. | set_client_encoding(enc) | Y | - |
connections | Transaction control methods and attributes. | notices | N | 数据库不支持listen/notify |
connections | Transaction control methods and attributes. | notifies | Y | - |
connections | Transaction control methods and attributes. | cursor_factory | Y | - |
connections | Transaction control methods and attributes. | info | Y | - |
connections | Transaction control methods and attributes. | status | Y | - |
connections | Transaction control methods and attributes. | lobject | N | 数据库不支持大对象相关操作 |
connections | Methods related to asynchronous support | poll() | Y | - |
connections | fileno() | Y | - | |
connections | isexecuting() | Y | - | |
connections | Interoperation with other C API modules | pgconn_ptr | Y | - |
connections | get_native_connection() | Y | - | |
connections | informative methods of the native connection | get_transaction_status() | Y | - |
connections | informative methods of the native connection | protocol_version | Y | - |
connections | informative methods of the native connection | server_version | Y | - |
connections | informative methods of the native connection | get_backend_pid() | Y | 获取到的不是后台的pid,是逻辑连接的id号 |
connections | informative methods of the native connection | get_parameter_status(parameter) | Y | - |
connections | informative methods of the native connection | get_dsn_parameters() | Y | - |
cursor | basic | description | Y | - |
cursor | basic | close() | Y | - |
cursor | basic | closed | Y | - |
cursor | basic | connection | Y | - |
cursor | basic | name | Y | - |
cursor | basic | scrollable | N | 数据库不支持SCROLL CURSOR |
cursor | basic | withhold | N | withhold cursor在commit前需要关闭 |
cursor | Commands execution methods | execute(query,vars=None) | Y | - |
cursor | Commands execution methods | executemany(query,vars_list) | Y | - |
cursor | Commands execution methods | callproc(procname[,parameters]) | Y | - |
cursor | Commands execution methods | mogrify(operation[,parameters]) | Y | - |
cursor | Commands execution methods | setinputsizes(sizes ) | Y | - |
cursor | Commands execution methods | fetchone() | Y | - |
cursor | Commands execution methods | fetchmany([size=cursor.arraysize]) | Y | - |
cursor | Commands execution methods | fetchall() | Y | - |
cursor | Commands execution methods | scroll(value[,mode='relative']) | N | 数据库不支持SCROLL CURSOR |
cursor | Commands execution methods | arraysize | Y | - |
cursor | Commands execution methods | itersize | Y | - |
cursor | Commands execution methods | rowcount | Y | - |
cursor | Commands execution methods | rownumber | Y | - |
cursor | Commands execution methods | lastrowid | Y | - |
cursor | Commands execution methods | query | Y | - |
cursor | Commands execution methods | statusmessage | Y | - |
cursor | Commands execution methods | cast(oid , s ) | Y | - |
cursor | Commands execution methods | tzinfo_factory | Y | - |
cursor | Commands execution methods | nextset() | Y | - |
cursor | Commands execution methods | setoutputsize(size [, column ]) | Y | - |
cursor | COPY-related methods | copy_from(file,table,sep='\t',null='\N',size=8192,columns=None) | Y | - |
cursor | COPY-related methods | copy_to(file,table,sep='\t',null='\N',columns=None) | Y | - |
cursor | COPY-related methods | copy_expert(sql,file,size=8192) | Y | - |
cursor | Interoperation with other C API modules | pgresult_ptr | Y | - |
在Linux环境使用psycopg2第三方库连接集群
1.以root用户登录Linux环境。
2.执行以下命令创建python_dws.py文件。
vi python_dws.py
请复制粘贴以下内容放入python_dws.py文件中:
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from __future__ import print_function
import psycopg2
def create_table(connection):
print("Begin to create table")
try:
cursor = connection.cursor()
cursor.execute("drop table if exists test;"
"create table test(id int, name text);")
connection.commit()
except psycopg2.ProgrammingError as e:
print(e)
else:
print("Table created successfully")
cursor.close()
def insert_data(connection):
print("Begin to insert data")
try:
cursor = connection.cursor()
cursor.execute("insert into test values(1,'number1');")
cursor.execute("insert into test values(2,'number2');")
cursor.execute("insert into test values(3,'number3');")
connection.commit()
except psycopg2.ProgrammingError as e:
print(e)
else:
print("Insert data successfully")
cursor.close()
def update_data(connection):
print("Begin to update data")
try:
cursor = connection.cursor()
cursor.execute("update test set name = 'numberupdated' where id=1;")
connection.commit()
print("Total number of rows updated :", cursor.rowcount)
cursor.execute("select * from test order by 1;")
rows = cursor.fetchall()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except psycopg2.ProgrammingError as e:
print(e)
else:
print("After Update, Operation done successfully")
def delete_data(connection):
print("Begin to delete data")
try:
cursor = connection.cursor()
cursor.execute("delete from test where id=3;")
connection.commit()
print("Total number of rows deleted :", cursor.rowcount)
cursor.execute("select * from test order by 1;")
rows = cursor.fetchall()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except psycopg2.ProgrammingError as e:
print(e)
else:
print("After Delete,Operation done successfully")
def select_data(connection):
print("Begin to select data")
try:
cursor = connection.cursor()
cursor.execute("select * from test order by 1;")
rows = cursor.fetchall()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except psycopg2.ProgrammingError as e:
print(e)
print("select failed")
else:
print("Operation done successfully")
cursor.close()
if __name__ == '__main__':
try:
conn = psycopg2.connect(host='10.154.70.231',
port='8000',
database='gaussdb', # 需要连接的database
user='dbadmin',
password='password') # 数据库用户密码
except psycopg2.DatabaseError as ex:
print(ex)
print("Connect database failed")
else:
print("Opened database successfully")
create_table(conn)
insert_data(conn)
select_data(conn)
update_data(conn)
delete_data(conn)
conn.close()
3.按照实际集群信息,修改python_dws.py文件中的集群公网访问地址、集群端口号、数据库名称、数据库用户名、数据库密码。
psycopg2接口不提供重试连接的能力,您需要在业务代码中实现重试处理。
conn = psycopg2.connect(host='10.154.70.231',
port='8000',
database='gaussdb', # 需要连接的database
user='dbadmin',
password='password') # 数据库用户密码
4.执行以下命令,使用psycopg第三方库连接集群。
python python_dws.py
在Windows环境使用psycopg2第三方库连接集群
1.在Windows系统中,单击“开始”按钮 ,在搜索框中,键入 cmd ,然后在结果列表中单击“cmd.exe”打开命令提示符窗口。
2.在命令提示符窗口中,执行以下命令创建python_dws.py文件。
type nul> python_dws.py
请复制粘贴以下内容放入python_dws.py文件中:
请复制粘贴以下内容放入python_dws.py文件中:
#!/usr/bin/python
# -*- coding:UTF-8 -*-
from __future__ import print_function
import psycopg2
def create_table(connection):
print("Begin to create table")
try:
cursor = connection.cursor()
cursor.execute("drop table if exists test;"
"create table test(id int, name text);")
connection.commit()
except psycopg2.ProgrammingError as e:
print(e)
else:
print("Table created successfully")
cursor.close()
def insert_data(connection):
print("Begin to insert data")
try:
cursor = connection.cursor()
cursor.execute("insert into test values(1,'number1');")
cursor.execute("insert into test values(2,'number2');")
cursor.execute("insert into test values(3,'number3');")
connection.commit()
except psycopg2.ProgrammingError as e:
print(e)
else:
print("Insert data successfully")
cursor.close()
def update_data(connection):
print("Begin to update data")
try:
cursor = connection.cursor()
cursor.execute("update test set name = 'numberupdated' where id=1;")
connection.commit()
print("Total number of rows updated :", cursor.rowcount)
cursor.execute("select * from test order by 1;")
rows = cursor.fetchall()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except psycopg2.ProgrammingError as e:
print(e)
else:
print("After Update, Operation done successfully")
def delete_data(connection):
print("Begin to delete data")
try:
cursor = connection.cursor()
cursor.execute("delete from test where id=3;")
connection.commit()
print("Total number of rows deleted :", cursor.rowcount)
cursor.execute("select * from test order by 1;")
rows = cursor.fetchall()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except psycopg2.ProgrammingError as e:
print(e)
else:
print("After Delete,Operation done successfully")
def select_data(connection):
print("Begin to select data")
try:
cursor = connection.cursor()
cursor.execute("select * from test order by 1;")
rows = cursor.fetchall()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except psycopg2.ProgrammingError as e:
print(e)
print("select failed")
else:
print("Operation done successfully")
cursor.close()
if __name__ == '__main__':
try:
conn = psycopg2.connect(host='10.154.70.231',
port='8000',
database='postgresgaussdb', # 需要连接的database
user='dbadmin',
password='password') # 数据库用户密码
except psycopg2.DatabaseError as ex:
print(ex)
print("Connect database failed")
else:
print("Opened database successfully")
create_table(conn)
insert_data(conn)
select_data(conn)
update_data(conn)
delete_data(conn)
conn.close()
3.按照实际集群信息,修改python_dws.py文件中的集群公网访问地址、集群端口号、数据库名称、数据库用户名、数据库密码。
conn = psycopg2.connect(host='10.154.70.231',
port='8000',
database='gaussdb', # 需要连接的database
user='dbadmin',
password='password') # 数据库用户密码
4.在命令提示符窗口中,执行以下命令,使用psycopg第三方库连接集群。
python python_dws.py
psycopg2连接集群不支持CN Retry特性的问题说明
DWS支持在SQL语句执行出错时的自动重试功能(简称CN Retry)。CN Retry对于客户端和驱动发送的SQL语句在执行失败时可以自动识别错误类型,并进行重试。但使用psycopg2默认连接方式创建的连接在语句执行失败时没有自动重试,会直接报错退出。如常见的主备切换场景下,未自动重试会报如下错误,但在自动重试期间完成主备切换,则会返回正确结果。
psycopg2.errors.ConnectionFailure: pooler: failed to create 1 connections, Error Message: remote node dn_6003_6004, detail: could not connect to server: Operation now in progress
报错原因:
1.psycopg2在发送SQL语句前先发送了BEGIN语句开启事务。
2.CN Retry不支持事务块中的语句是特性约束。
解决方案:
- 在同步方式连接时,可以通过主动结束驱动开启的事务。
cursor = conn.cursor()
# 增加end语句主动结束驱动开启的事务
cursor.execute("end; select * from test order by 1;")
rows = cursor.fetchall()
- 使用异步连接方式主动开启事务,异步连接介绍具体请参见pyscopg官网。
#!/usr/bin/env python3
# _*_ encoding=utf-8 _*_
import psycopg2
import select
# psycopg2官方提供的异步连接方式时的wait函数
def wait(conn):
while True:
state = conn.poll()
if state == psycopg2.extensions.POLL_OK:
break
elif state == psycopg2.extensions.POLL_WRITE:
select.select([], [conn.fileno()], [])
elif state == psycopg2.extensions.POLL_READ:
select.select([conn.fileno()], [], [])
else:
raise psycopg2.OperationalError("poll() returned %s" % state)
def psycopg2_cnretry_sync():
# 创建连接
conn = psycopg2.connect(host='10.154.70.231',
port='8000',
database='gaussdb', # 需要连接的database
user='dbadmin',
password='password', # 数据库用户密码
async=1) # 使用异步方式连接
wait(conn)
# 执行查询
cursor = conn.cursor()
cursor.execute("select * from test order by 1;")
wait(conn)
rows = cursor.fetchall()
for row in rows:
print(row[0], row[1])
# 关闭连接
conn.close()
if __name__ == '__main__':
psycopg2_cnretry_async()