用户在创建好数据仓库集群后使用PyGreSQL第三方库连接到集群,则可以使用Python访问DWS ,并进行数据表的各类操作。
连接集群前的准备
- DWS 集群已绑定弹性IP。
- 已获取DWS 集群的数据库管理员用户名和密码。
注意由于MD5算法已经被证实存在碰撞可能,已严禁将之用于密码校验算法。当前DWS 采用默认安全设计,默认禁止MD5算法的密码校验,可能导致开源客户端无法正常连接的问题。建议先检查一下数据库参数password_encryption_type参数是否为1,如果取值不为1,需要修改,修改方法参见《用户指南》的“[修改数据库参数](https://www.ctyun.cn/document/10014061/10047788)”章节;然后修改一次准备使用的数据库用户的密码。
说明当前DWS出于安全考虑,已经默认不再使用MD5存储密码摘要了,这将导致使用开源驱动或者客户端无法正常连接数据库。需要您调整一下密码策略后再创建一个新用户或者对老用户做一次密码修改,方可使用开源协议中使用的MD5认证算法。
数据库中是不会存储您的密码原文的,而是存储的密码的HASH摘要,在密码校验时与客户端发来的密码摘要进行比对(中间会有加盐操作)。故当您改变了密码算法策略时,数据库也是无法还原您的密码,再生成新的HASH算法的摘要值的。必须您手动修改一次密码或者创建一个新用户,这时新的密码将会采用您设置的HASH算法进行摘要存储,用于下次连接认证。
- 已获取DWS 集群的公网访问地址,含IP地址和端口。具体请参见 获取集群连接地址。
- 已安装PyGreSQL第三方库。
说明CentOS、Redhat等操作系统中使用yum命令安装,命令为:yum install PyGreSQL。
PyGreSQL的使用依赖于PostgreSQL的libpq动态库(32位的PyGreSQL对应32位的libpq,64位的PyGreSQL对应64位的libpq),Linux中可以依赖yum命令解决。在Windows系统使用PyGreSQL需要先安装libpq,主要方式有两种:
1.安装PostgreSQL,并配置libpq、ssl、crypto动态库位置到环境变量PATH中。
2.安装psqlodbc,使用PostgreSQL ODBC驱动携带的libpq、ssl、crypto动态库。
使用约束
由于PyGreSQL是基于PostgreSQL的客户端接口,它的功能DWS并不能完全支持。具体支持情况请见下表。
说明以下接口支持情况是基于Python 3.8.5及PyGreSQL 5.2.4版本。
DWS对PyGreSQL主要接口支持情况
PyGreSQL | 支持 | 备注 | |
---|---|---|---|
Module functions and constants | connect–Open a PostgreSQL connection | Y | - |
Module functions and constants | get_pqlib_version–get the version of libpq | Y | - |
Module functions and constants | get/set_defhost–default server host [DV] | Y | - |
Module functions and constants | get/set_defport–default server port [DV] | Y | - |
Module functions and constants | get/set_defopt–default connection options [DV] | Y | - |
Module functions and constants | get/set_defbase–default database name [DV] | Y | - |
Module functions and constants | get/set_defuser–default database user [DV] | Y | - |
Module functions and constants | get/set_defpasswd–default database password [DV] | Y | - |
Module functions and constants | escape_string–escape a string for use within SQL | Y | - |
Module functions and constants | escape_bytea–escape binary data for use within SQL | Y | - |
Module functions and constants | unescape_bytea–unescape data that has been retrieved as text | Y | - |
Module functions and constants | get/set_namedresult–conversion to named tuples | Y | - |
Module functions and constants | get/set_decimal–decimal type to be used for numeric values | Y | - |
Module functions and constants | get/set_decimal_point–decimal mark used for monetary values | Y | - |
Module functions and constants | get/set_bool–whether boolean values are returned as bool objects | Y | - |
Module functions and constants | get/set_array–whether arrays are returned as list objects | Y | - |
Module functions and constants | get/set_bytea_escaped–whether bytea data is returned escaped | Y | - |
Module functions and constants | get/set_jsondecode–decoding JSON format | Y | - |
Module functions and constants | get/set_cast_hook–fallback typecast function | Y | - |
Module functions and constants | get/set_datestyle–assume a fixed date style | Y | - |
Module functions and constants | get/set_typecast–custom typecasting | Y | - |
Module functions and constants | cast_array/record–fast parsers for arrays and records | Y | - |
Module functions and constants | Type helpers | Y | - |
Module functions and constants | Module constants | Y | - |
Connection–The connection object | query–execute a SQL command string | Y | - |
Connection–The connection object | send_query - executes a SQL command string asynchronously | Y | - |
Connection–The connection object | query_prepared–execute a prepared statement | Y | - |
Connection–The connection object | prepare–create a prepared statement | Y | - |
Connection–The connection object | describe_prepared–describe a prepared statement | Y | - |
Connection–The connection object | reset–reset the connection | Y | - |
Connection–The connection object | poll - completes an asynchronous connection | Y | - |
Connection–The connection object | cancel–abandon processing of current SQL command | Y | - |
Connection–The connection object | close–close the database connection | Y | - |
Connection–The connection object | transaction–get the current transaction state | Y | - |
Connection–The connection object | parameter–get a current server parameter setting | Y | - |
Connection–The connection object | date_format–get the currently used date format | Y | - |
Connection–The connection object | fileno–get the socket used to connect to the database | Y | - |
Connection–The connection object | set_non_blocking - set the non-blocking status of the connection | Y | - |
Connection–The connection object | is_non_blocking - report the blocking status of the connection | Y | - |
Connection–The connection object | getnotify–get the last notify from the server | N | 数据库不支持listen/notify |
Connection–The connection object | inserttable–insert a list into a table | Y | copy命令中如果有\n,请使用双引号引用此字段 |
Connection–The connection object | get/set_notice_receiver–custom notice receiver | Y | - |
Connection–The connection object | putline–write a line to the server socket [DA] | Y | - |
Connection–The connection object | getline–get a line from server socket [DA] | Y | - |
Connection–The connection object | endcopy–synchronize client and server [DA] | Y | - |
Connection–The connection object | locreate–create a large object in the database [LO] | N | 大对象相关操作 |
Connection–The connection object | getlo–build a large object from given oid [LO] | N | 大对象相关操作 |
Connection–The connection object | loimport–import a file to a large object [LO] | N | 大对象相关操作 |
Connection–The connection object | Object attributes | Y | - |
The DB wrapper class | Initialization | Y | - |
The DB wrapper class | pkey–return the primary key of a table | Y | - |
The DB wrapper class | get_databases–get list of databases in the system | Y | - |
The DB wrapper class | get_relations–get list of relations in connected database | Y | - |
The DB wrapper class | get_tables–get list of tables in connected database | Y | - |
The DB wrapper class | get_attnames–get the attribute names of a table | Y | - |
The DB wrapper class | has_table_privilege–check table privilege | Y | - |
The DB wrapper class | get/set_parameter–get or set run-time parameters | Y | - |
The DB wrapper class | begin/commit/rollback/savepoint/release–transaction handling | Y | - |
The DB wrapper class | get–get a row from a database table or view | Y | - |
The DB wrapper class | insert–insert a row into a database table | Y | - |
The DB wrapper class | update–update a row in a database table | Y | - |
The DB wrapper class | upsert–insert a row with conflict resolution | Y | - |
The DB wrapper class | query–execute a SQL command string | Y | - |
The DB wrapper class | query_formatted–execute a formatted SQL command string | Y | - |
The DB wrapper class | query_prepared–execute a prepared statement | Y | - |
The DB wrapper class | prepare–create a prepared statement | Y | - |
The DB wrapper class | describe_prepared–describe a prepared statement | Y | - |
The DB wrapper class | delete_prepared–delete a prepared statement | Y | - |
The DB wrapper class | clear–clear row values in memory | Y | - |
The DB wrapper class | delete–delete a row from a database table | Y | 元组必须有唯一键或者主键 |
The DB wrapper class | truncate–quickly empty database tables | Y | - |
The DB wrapper class | get_as_list/dict–read a table as a list or dictionary | Y | - |
The DB wrapper class | escape_literal/identifier/string/bytea–escape for SQL | Y | - |
The DB wrapper class | unescape_bytea– unescape data retrieved from the database | Y | - |
The DB wrapper class | encode/decode_json–encode and decode JSON data | Y | - |
The DB wrapper class | use_regtypes–determine use of regular type names | Y | - |
The DB wrapper class | notification_handler–create a notification handler | N | 数据库不支持listen/notify |
The DB wrapper class | Attributes of the DB wrapper class | Y | - |
Query methods | getresult–get query values as list of tuples | Y | - |
Query methods | dictresult/dictiter–get query values as dictionaries | Y | - |
Query methods | namedresult/namediter–get query values as named tuples | Y | - |
Query methods | scalarresult/scalariter–get query values as scalars | Y | - |
Query methods | one/onedict/onenamed/onescalar–get one result of a query | Y | - |
Query methods | single/singledict/singlenamed/singlescalar–get single result of a query | Y | - |
Query methods | listfields–list fields names of previous query result | Y | - |
Query methods | fieldname, fieldnum–field name/number conversion | Y | - |
Query methods | fieldinfo–detailed info about query result fields | Y | - |
Query methods | ntuples–return number of tuples in query object | Y | - |
Query methods | memsize–return number of bytes allocated by query result | Y | - |
LargeObject–Large Objects | open–open a large object | N | 大对象相关操作 |
LargeObject–Large Objects | close–close a large object | N | 大对象相关操作 |
LargeObject–Large Objects | read, write, tell, seek, unlink–file-like large object handling | N | 大对象相关操作 |
LargeObject–Large Objects | size–get the large object size | N | 大对象相关操作 |
LargeObject–Large Objects | export–save a large object to a file | N | 大对象相关操作 |
LargeObject–Large Objects | Object attributes | N | 大对象相关操作 |
The Notification Handler | Instantiating the notification handler | N | 数据库不支持listen/notify |
The Notification Handler | Invoking the notification handler | N | 数据库不支持listen/notify |
The Notification Handler | Sending notifications | N | 数据库不支持listen/notify |
The Notification Handler | Auxiliary methods | N | 数据库不支持listen/notify |
pgdb | |||
Module functions and constants | connect–Open a PostgreSQL connection | Y | - |
Module functions and constants | get/set/reset_typecast–Control the global typecast functions | Y | - |
Module functions and constants | Module constants | Y | - |
Module functions and constants | Errors raised by this module | Y | - |
Connection–The connection object | close–close the connection | Y | - |
Connection–The connection object | commit–commit the connection | Y | - |
Connection–The connection object | rollback–roll back the connection | Y | - |
Connection–The connection object | cursor–return a new cursor object | Y | - |
Connection–The connection object | Attributes that are not part of the standard | Y | - |
Cursor–The cursor object | description–details regarding the result columns | Y | - |
Cursor–The cursor object | rowcount–number of rows of the result | Y | - |
Cursor–The cursor object | close–close the cursor | Y | - |
Cursor–The cursor object | execute–execute a database operation | Y | - |
Cursor–The cursor object | executemany–execute many similar database operations | Y | - |
Cursor–The cursor object | callproc–Call a stored procedure | Y | - |
Cursor–The cursor object | fetchone–fetch next row of the query result | Y | - |
Cursor–The cursor object | fetchmany–fetch next set of rows of the query result | Y | - |
Cursor–The cursor object | fetchall–fetch all rows of the query result | Y | - |
Cursor–The cursor object | arraysize - the number of rows to fetch at a time | Y | - |
Cursor–The cursor object | Methods and attributes that are not part of the standard | Y | - |
Type–Type objects and constructors | Type constructors | Y | - |
Type–Type objects and constructors | Type objects | Y | - |
在Linux环境使用PyGreSQL第三方库连接集群
1.以root用户登录Linux环境。
2.执行以下命令创建python_dws.py文件。
vi python_dws.py
请复制粘贴以下内容放入python_dws.py文件中:
#!/usr/bin/env python3
# _*_ encoding:utf-8 _*_
from __future__ import print_function
import pg
def create_table(connection):
print("Begin to create table")
try:
connection.query("drop table if exists test;"
"create table test(id int, name text);")
except pg.InternalError as e:
print(e)
else:
print("Table created successfully")
def insert_data(connection):
print("Begin to insert data")
try:
connection.query("insert into test values(1,'number1');")
connection.query("insert into test values(2,'number2');")
connection.query("insert into test values(3,'number3');")
except pg.InternalError as e:
print(e)
else:
print("Insert data successfully")
def update_data(connection):
print("Begin to update data")
try:
result = connection.query("update test set name = 'numberupdated' where id=1;")
print("Total number of rows updated :", result)
result = connection.query("select * from test order by 1;")
rows = result.getresult()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except pg.InternalError as e:
print(e)
else:
print("After Update, Operation done successfully")
def delete_data(connection):
print("Begin to delete data")
try:
result = connection.query("delete from test where id=3;")
print("Total number of rows deleted :", result)
result = connection.query("select * from test order by 1;")
rows = result.getresult()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except pg.InternalError as e:
print(e)
else:
print("After Delete,Operation done successfully")
def select_data(connection):
print("Begin to select data")
try:
result = connection.query("select * from test order by 1;")
rows = result.getresult()
for row in rows:
print("id = ", row[0])
print("name = ", row[1])
except pg.InternalError as e:
print(e)
print("select failed")
else:
print("Operation done successfully")
if __name__ == '__main__':
try:
conn = pg.DB(host='10.154.70.231',
port=8000,
dbname='gaussdb', # 需要连接的database
user='dbadmin',
passwd='password') # 数据库用户密码
except pg.InternalError as ex:
print(ex)
print("Connect database failed")
else:
print("Opened database successfully")
create_table(conn)
insert_data(conn)
select_data(conn)
update_data(conn)
delete_data(conn)
conn.close()
或使用dbapi接口实现:
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from __future__ import print_function
import pg
import pgdb
def create_table(connection):
print("Begin to create table")
try:
cursor = connection.cursor()
cursor.execute("drop table if exists test;"
"create table test(id int, name text);")
connection.commit()
except pg.InternalError as e:
print(e)
else:
print("Table created successfully")
cursor.close()
def insert_data(connection):
print("Begin to insert data")
try:
cursor = connection.cursor()
cursor.execute("insert into test values(1,'number1');")
cursor.execute("insert into test values(2,'number2');")
cursor.execute("insert into test values(3,'number3');")
connection.commit()
except pg.InternalError as e:
print(e)
else:
print("Insert data successfully")
cursor.close()
def update_data(connection):
print("Begin to update data")
try:
cursor = connection.cursor()
cursor.execute("update test set name = 'numberupdated' where id=1;")
connection.commit()
print("Total number of rows updated :", cursor.rowcount)
cursor.execute("select * from test;")
rows = cursor.fetchall()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except pg.InternalError as e:
print(e)
else:
print("After Update, Operation done successfully")
def delete_data(connection):
print("Begin to delete data")
try:
cursor = connection.cursor()
cursor.execute("delete from test where id=3;")
connection.commit()
print("Total number of rows deleted :", cursor.rowcount)
cursor.execute("select * from test;")
rows = cursor.fetchall()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except pg.InternalError as e:
print(e)
else:
print("After Delete,Operation done successfully")
def select_data(connection):
print("Begin to select data")
try:
cursor = connection.cursor()
cursor.execute("select * from test;")
rows = cursor.fetchall()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except pg.InternalError as e:
print(e)
print("select failed")
else:
print("Operation done successfully")
cursor.close()
if __name__ == '__main__':
try:
conn = pgdb.connect(host='10.154.70.231',
port='8000',
database='gaussdb', # 需要连接的database
user='dbadmin',
password='password') # 数据库用户密码
except pg.InternalError as ex:
print(ex)
print("Connect database failed")
else:
print("Opened database successfully")
create_table(conn)
insert_data(conn)
select_data(conn)
update_data(conn)
delete_data(conn)
conn.close()
3.按照实际集群信息,修改python_dws.py文件中的集群公网访问地址、集群端口号、数据库名称、数据库用户名、数据库密码。
说明
PyGreSQL接口不提供重试连接的能力,您需要在业务代码中实现重试处理。
conn = pgdb.connect(host='10.154.70.231',
port='8000',
database='gaussdb', # 需要连接的database
user='dbadmin',
password='password') # 数据库用户密码
4.执行以下命令,使用PyGreSQL第三方库连接集群。
python python_dws.py
在Windows环境使用PyGreSQL第三方库连接集群
1.在Windows系统中,单击“开始”按钮 ,在搜索框中,键入 cmd ,然后在结果列表中单击“cmd.exe”打开命令提示符窗口。
2.在命令提示符窗口中,执行以下命令创建python_dws.py文件。
type nul> python_dws.py
请复制粘贴以下内容放入python_dws.py文件中:
#!/usr/bin/env python3
# _*_ encoding:utf-8 _*_
from __future__ import print_function
import pg
def create_table(connection):
print("Begin to create table")
try:
connection.query("drop table if exists test;"
"create table test(id int, name text);")
except pg.InternalError as e:
print(e)
else:
print("Table created successfully")
def insert_data(connection):
print("Begin to insert data")
try:
connection.query("insert into test values(1,'number1');")
connection.query("insert into test values(2,'number2');")
connection.query("insert into test values(3,'number3');")
except pg.InternalError as e:
print(e)
else:
print("Insert data successfully")
def update_data(connection):
print("Begin to update data")
try:
result = connection.query("update test set name = 'numberupdated' where id=1;")
print("Total number of rows updated :", result)
result = connection.query("select * from test order by 1;")
rows = result.getresult()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except pg.InternalError as e:
print(e)
else:
print("After Update, Operation done successfully")
def delete_data(connection):
print("Begin to delete data")
try:
result = connection.query("delete from test where id=3;")
print("Total number of rows deleted :", result)
result = connection.query("select * from test order by 1;")
rows = result.getresult()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except pg.InternalError as e:
print(e)
else:
print("After Delete,Operation done successfully")
def select_data(connection):
print("Begin to select data")
try:
result = connection.query("select * from test order by 1;")
rows = result.getresult()
for row in rows:
print("id = ", row[0])
print("name = ", row[1])
except pg.InternalError as e:
print(e)
print("select failed")
else:
print("Operation done successfully")
if __name__ == '__main__':
try:
conn = pg.DB(host='10.154.70.231',
port=8000,
dbname='gaussdb', # 需要连接的database
user='dbadmin',
passwd='password') # 数据库用户密码
except pg.InternalError as ex:
print(ex)
print("Connect database failed")
else:
print("Opened database successfully")
create_table(conn)
insert_data(conn)
select_data(conn)
update_data(conn)
delete_data(conn)
conn.close()
或使用dbapi接口实现::
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from __future__ import print_function
import pg
import pgdb
def create_table(connection):
print("Begin to create table")
try:
cursor = connection.cursor()
cursor.execute("drop table if exists test;"
"create table test(id int, name text);")
connection.commit()
except pg.InternalError as e:
print(e)
else:
print("Table created successfully")
cursor.close()
def insert_data(connection):
print("Begin to insert data")
try:
cursor = connection.cursor()
cursor.execute("insert into test values(1,'number1');")
cursor.execute("insert into test values(2,'number2');")
cursor.execute("insert into test values(3,'number3');")
connection.commit()
except pg.InternalError as e:
print(e)
else:
print("Insert data successfully")
cursor.close()
def update_data(connection):
print("Begin to update data")
try:
cursor = connection.cursor()
cursor.execute("update test set name = 'numberupdated' where id=1;")
connection.commit()
print("Total number of rows updated :", cursor.rowcount)
cursor.execute("select * from test;")
rows = cursor.fetchall()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except pg.InternalError as e:
print(e)
else:
print("After Update, Operation done successfully")
def delete_data(connection):
print("Begin to delete data")
try:
cursor = connection.cursor()
cursor.execute("delete from test where id=3;")
connection.commit()
print("Total number of rows deleted :", cursor.rowcount)
cursor.execute("select * from test;")
rows = cursor.fetchall()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except pg.InternalError as e:
print(e)
else:
print("After Delete,Operation done successfully")
def select_data(connection):
print("Begin to select data")
try:
cursor = connection.cursor()
cursor.execute("select * from test;")
rows = cursor.fetchall()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except pg.InternalError as e:
print(e)
print("select failed")
else:
print("Operation done successfully")
cursor.close()
if __name__ == '__main__':
try:
conn = pgdb.connect(host='10.154.70.231',
port='8000',
database='gaussdb', # 需要连接的database
user='dbadmin',
password='password') # 数据库用户密码
except pg.InternalError as ex:
print(ex)
print("Connect database failed")
else:
print("Opened database successfully")
create_table(conn)
insert_data(conn)
select_data(conn)
update_data(conn)
delete_data(conn)
conn.close()
3.按照实际集群信息,修改python_dws.py文件中的集群公网访问地址、集群端口号、数据库名称、数据库用户名、数据库密码。
PyGreSQL接口不提供重试连接的能力,您需要在业务代码中实现重试处理。
conn = pgdb.connect(host='10.154.70.231',
port='8000',
database='gaussdb', # 需要连接的database
user='dbadmin',
password='password') # 数据库用户密码
4.执行以下命令,使用PyGreSQL第三方库连接集群。
python python_dws.py