三、对象访问控制
在PostgreSQL中,建立安全的数据库连接是由用户标识和认证技术来共同实现的,而建立连接后的安全访问保护则是基于角色的对象访问控制。
1.访问权限列表
访问权限列表(Access Control List,ACL)是对象权限管理和权限检查的基础,PostgreSQL通过操作ACL实现对象的访问控制管理。在PostgreSQL中,每个数据库对象都具有ACL。
ACL是存储控制项(Access Control Entruy,ACE)的集合。
每个ACL实际上是一个由多个AclItem(数据结构如下所示)构成的列表。每个AclItem对应一个ACE。其中,字段ai_privs是AclMode类型。AclMode是一个32位的比特位。其高16位为权限选项位,低16位为该ACE中的操作权限位。每个操作权限占一个比特位,当该比特位的取值为1时,表示ACE中的ai_grantee对应的用户(受权者)具有此对象的相应操作权限。
typedef struct AclItem
{
Oid ai_grantee; /* 受权者OID */
Oid ai_grantor; /* 授权者OID */
AclMode ai_privs; /* 权限位 */
} AclItem;
(1).ACL检查
在PostgreSQL中,主要通过访问控制表来存储对象权限的相关信息。每一个对象中都会有一个ACL。对于指定的对象,用户可以查询该对象上是否存在某权限信息。对于不同的数据库对象(如数据库、表、语言、模式、命名空间、表空间等),都根据其不同的权限属性采用了不同的函数来实现。对于表来说,该检查过程可以用函数表示为: has_table_privilege_*_*(PG_FUNCTION_ARGS)
注意,函数名中的最后两个星号分别代表参数中的用户信息和数据库对象信息,例如函数名为 has_table_privilege_name_id
表示参数中给出了用户名和表的OID,该函数将检查给定的用户是否拥有在给定表上的指定权限。
/*
* has_table_privilege_name_id
* Check user privileges on a table given
* name usename, table oid, and text priv name.
*/
Datum
has_table_privilege_name_id(PG_FUNCTION_ARGS)
{
Name username = PG_GETARG_NAME(0);
Oid tableoid = PG_GETARG_OID(1);
text *priv_type_text = PG_GETARG_TEXT_PP(2);
Oid roleid;
AclMode mode;
AclResult aclresult;
//得到角色oid
roleid = get_role_oid_or_public(NameStr(*username));
mode = convert_table_priv_string(priv_type_text);
if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(tableoid)))
PG_RETURN_NULL();
//获得用户在该表上的权限集,并且进行比较
aclresult = pg_class_aclcheck(tableoid, roleid, mode);
PG_RETURN_BOOL(aclresult == ACLCHECK_OK);
}
(2).ACL更新
ACL的更新很简单,将权限对应的标志位置为0或者1即可。
在执行授权操作时,可能会出现循环授权的情况。如下图所示,Johnney将表CheckTable的SELECT权限赋予给Tom;Tom又将之转授给David,最后David企图将同样的权限赋予给Johnney。这一系列的授权操作构成了一个环,这个环是不允许出现的。
为了检测出循环授权的情况,在进行授权时会先执行函数 check_circularity
,该函数将通过递归删除权限的方式来检查是否有环构成。其执行过程如下:
-
获取旧的ACL、要增加的授权信息以及对象的拥有者。
-
创建一个ACL副本进行操作。
-
递归删除所有被授权者所拥有的可再授权权限。
-
检查授予者是否还有可再授予权限。如果没有,则收于这不能进行授权。
static void
check_circularity(const Acl *old_acl, const AclItem *mod_aip,
Oid ownerId)
{
Acl *acl;
AclItem *aip;
int i,
num;
AclMode own_privs;
//验证acl合法性,一维且无空
check_acl(old_acl);
//保证授予的是角色,而不是PUBLIC
Assert(mod_aip->ai_grantee != ACL_ID_PUBLIC);
//拥有者有授予权限,不需要检查
if (mod_aip->ai_grantor == ownerId)
return;
//复制acl,得到一个新的列表
acl = allocacl(ACL_NUM(old_acl));
memcpy(acl, old_acl, ACL_SIZE(old_acl));
//递归删除所有被授权者所拥有的可再授权权限
cc_restart:
num = ACL_NUM(acl);
aip = ACL_DAT(acl);
for (i = 0; i < num; i++)
{
if (aip[i].ai_grantee == mod_aip->ai_grantee &&
ACLITEM_GET_GOPTIONS(aip[i]) != ACL_NO_RIGHTS)
{
Acl *new_acl;
/* We'll actually zap ordinary privs too, but no matter */
new_acl = aclupdate(acl, &aip[i], ACL_MODECHG_DEL,
ownerId, DROP_CASCADE);
pfree(acl);
acl = new_acl;
goto cc_restart;
}
}
//得到授权者的独立派生的权限
own_privs = aclmask(acl,
mod_aip->ai_grantor,
ownerId,
ACL_GRANT_OPTION_FOR(ACLITEM_GET_GOPTIONS(*mod_aip)),
ACLMASK_ALL);
own_privs = ACL_OPTION_TO_PRIVS(own_privs);
//和要授予的权限作比较,没有的话发送错误消息
if ((ACLITEM_GET_GOPTIONS(*mod_aip) & ~own_privs) != 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_GRANT_OPERATION),
errmsg("grant options cannot be granted back to your own grantor")));
pfree(acl);
}
2.对象权限管理
对象权限管理主要是通过使用SQL命令GRANT/REVOKE授予或回收一个或多个角色在对象上的权限。
(1)对象授权
对象授权语句GRANT的语法结构如下:
GRANT { { SELECT | INSERT | UPDATE | DELETE | TRUNCATE | REFERENCES | TRIGGER }
[, ...] | ALL [ PRIVILEGES ] }
ON { [ TABLE ] table_name [, ...]
| ALL TABLES IN SCHEMA schema_name [, ...] }
TO role_specification [, ...] [ WITH GRANT OPTION ]
[ GRANTED BY role_specification ]
此命令表示在数据库对象上给一个或者多个角色授予特定权限。关键字PUBLIC表示权限要赋予所有角色,包括那些以后可能创建的用户。PUBLIC可以看作是一个预定义好的组,它总是包括所有角色。任何特定的角色的权限由三部分组成:直接赋予的权限、从所属的角色继承来的权限以及被赋予的PUBLIC权限。
如果声明WITH GRANT OPTION ,那么权限的被授予者可以将此权限再赋予他人,否则只能自身拥有被赋予的权限,这个选项称为”可再被授予权限“。此
权限不能赋予PUBLIC。
(2)对象权限回收
对象权限回收命令REVOKE的语法结构如下:
REVOKE [ GRANT OPTION FOR ]
{ { SELECT | INSERT | UPDATE | DELETE | TRUNCATE | REFERENCES | TRIGGER }
[, ...] | ALL [ PRIVILEGES ] }
ON { [ TABLE ] table_name [, ...]
| ALL TABLES IN SCHEMA schema_name [, ...] }
FROM role_specification [, ...]
[ GRANTED BY role_specification ]
[ CASCADE | RESTRICT ]
若指定了GRANT OPTION FOR ,则只是撤销角色对该权限的再授予能力,而不撤销权限本身。
GRANT/REVOKE命令都由函数ExecuteGrantStmt
实现,该函数只有一个类型为GrantStmt的参数。
typedef struct GrantStmt
{
NodeTag type;
bool is_grant; /* true = GRANT, false = REVOKE */
GrantTargetType targtype; /* 授权目标的对象类型 */
ObjectType objtype; /*被操作的对象类型 */
List *objects; /* list of RangeVar nodes, ObjectWithArgs
* nodes, or plain names (as Value strings) */
List *privileges; /* 被操作的权限集合 */
/* privileges == NIL denotes ALL PRIVILEGES */
List *grantees; /* 受权者集合 */
bool grant_option; /* grant or revoke grant option */
DropBehavior behavior; /* 回收权限行为(for REVOKE) */
} GrantStmt;
函数ExecuteGrantStmt
先将GrantStmt结构转化成InternalGrant结构,将命令的权限列表转化为内部的AclMode表示。当privileges取值为NIL时,表示授予或回收所有权限,此时InternalGrant的all_privs字段为true,且InternalGrant中的privileges字段被设置为ACL_NO_RIGHTS,也表示ACL_ALL_RIGHTS。
typedef struct
{
bool is_grant;//true = GRANT, false = REVOKE
ObjectType objtype;//被操作的对象类型
List *objects;//被操作的对象列表
bool all_privs;//是否授予所有权限
AclMode privileges;//比特位形式表示的操作权限
List *col_privs;//列权限
List *grantees;//OID结构
bool grant_option;
DropBehavior behavior;
} InternalGrant;
void
ExecuteGrantStmt(GrantStmt *stmt)
{
InternalGrant istmt;
ListCell *cell;
const char *errormsg;
AclMode all_privileges;
//将GrantStmt转换为InterbalGrant
istmt.is_grant = stmt->is_grant;
istmt.objtype = stmt->objtype;
//将受权者链表转化为OID链表
switch (stmt->targtype)
{
case ACL_TARGET_OBJECT:
istmt.objects = objectNamesToOids(stmt->objtype, stmt->objects);
break;
case ACL_TARGET_ALL_IN_SCHEMA:
istmt.objects = objectsInSchemaToOids(stmt->objtype, stmt->objects);
break;
/* ACL_TARGET_DEFAULTS should not be seen here */
default:
elog(ERROR, "unrecognized GrantStmt.targtype: %d",
(int) stmt->targtype);
}
//填充值
istmt.col_privs = NIL; /* may get filled below */
istmt.grantees = NIL; /* filled below */
istmt.grant_option = stmt->grant_option;
istmt.behavior = stmt->behavior;
//将RoleSpec列表转换为Oid列表。
foreach(cell, stmt->grantees)
{
RoleSpec *grantee = (RoleSpec *) lfirst(cell);
Oid grantee_uid;
switch (grantee->roletype)
{
case ROLESPEC_PUBLIC:
grantee_uid = ACL_ID_PUBLIC;
break;
default:
grantee_uid = get_rolespec_oid(grantee, false);
break;
}
istmt.grantees = lappend_oid(istmt.grantees, grantee_uid);
}
//根据对象类型,将stmt->privileges (AccessPriv节点列表)转换为AclMode位掩码。
switch (stmt->objtype)
{ //16个case
case OBJECT_TABLE:
all_privileges = ACL_ALL_RIGHTS_RELATION | ACL_ALL_RIGHTS_SEQUENCE;
errormsg = gettext_noop("invalid privilege type %s for relation");
break;
.......
}
......
//根据对象类型调用相应对象上的权限管理函数。
ExecGrantStmt_oids(&istmt);
}
对于表、模式、数据库、函数等不同的数据库对象,它们的权限管理分别调用相应的函数来完成。此类算法的构成大同小异,核心过程是通过解析GRANT/REVOKE命令获取权限信息,然后和保存在ACL中的原授权信息共同计算出实际要授予/回收的权限,最后更新对象上的ACL。以表为例描述对象上的权限管理过程。表上的权限管理函数是ExecGrant_Relation
,它的参数就是在ExecuteGrantStmt
中转换得到的InternalGrant结构,其流程图如下图所示:
3.对象权限检查
在对数据库对象进行操作时,必须要对该对象上的权限进行检查。下面以表上的权限检查为例来描述检查函数的实现过程。表上的权限检查由函数ExecCheckRTEPerms
实现,该函数只有一个类型为RangeTblEntry的参数。范围表(RTE)。
typedef struct RangeTblEntry
{
NodeTag type;
RTEKind rtekind; /* 对象类型 */
.......
Alias *alias; /* user-written alias clause, if any */
Alias *eref; /* expanded reference names */
bool lateral; /* subquery, function, or values is LATERAL? */
bool inh; /* inheritance requested? */
bool inFromCl; /* present in FROM clause? */
AclMode requiredPerms; /* 需要的访问权限 */
Oid checkAsUser; /* 角色ID */
Bitmapset *selectedCols; /* 需要有SELECT权限的列 */
Bitmapset *insertedCols; /* 需要有INSERT权限的列 */
Bitmapset *updatedCols; /* 需要有UPDATE权限的列 */
Bitmapset *extraUpdatedCols; /* generated columns being updated */
List *securityQuals; /* security barrier quals to apply, if any */
} RangeTblEntry;
如果表级权限检查没有通过,我们可以进一步判断该用户是否在该表上具有列级权限。对列的操作检查有ACL_SELECT、ACL_INSERT、ACL_UPDATE三种。如果用户有列级权限,则检查通过(仅限对此列进行操作),否则检查失败。
//检查单个RTE的访问权限
static bool
ExecCheckRTEPerms(cRangeTblEntry *rte)
{
AclMode requiredPerms;
AclMode relPerms;
AclMode remainingPerms;
Oid relOid;
Oid userid;
......
relOid = rte->relid;
userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
//获取所需权限
relPerms = pg_class_aclmask(relOid, userid, requiredPerms, ACLMASK_ALL);
remainingPerms = requiredPerms & ~relPerms;
if (remainingPerms != 0)
{
int col = -1;
//权限缺失直接失败
if (remainingPerms & ~(ACL_SELECT | ACL_INSERT | ACL_UPDATE))
return false;
//检查是否拥有列级别所需要的权限
if (remainingPerms & ACL_SELECT)
{
if (bms_is_empty(rte->selectedCols))
{
if (pg_attribute_aclcheck_all(relOid, userid, ACL_SELECT,
ACLMASK_ANY) != ACLCHECK_OK)
return false;
}
while ((col = bms_next_member(rte->selectedCols, col)) >= 0)
{
/* bit #s are offset by FirstLowInvalidHeapAttributeNumber */
AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber;
if (attno == InvalidAttrNumber)
{
/* Whole-row reference, must have priv on all cols */
if (pg_attribute_aclcheck_all(relOid, userid, ACL_SELECT,
ACLMASK_ALL) != ACLCHECK_OK)
return false;
}
else
{
if (pg_attribute_aclcheck(relOid, attno, userid,
ACL_SELECT) != ACLCHECK_OK)
return false;
}
}
}
//对于由remainingPerms指定的插入和更新权限,修改列基本上是相同的。
if (remainingPerms & ACL_INSERT && !ExecCheckRTEPermsModified(relOid,
userid,
rte->insertedCols,
ACL_INSERT))
return false;
if (remainingPerms & ACL_UPDATE && !ExecCheckRTEPermsModified(relOid,
userid,
rte->updatedCols,
ACL_UPDATE))
return false;
}
return true;
}