溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

使用Django-xadmin+rule對象級權(quán)限的實(shí)現(xiàn)案例

發(fā)布時(shí)間:2021-05-07 14:15:21 來源:億速云 閱讀:144 作者:小新 欄目:開發(fā)技術(shù)

這篇文章將為大家詳細(xì)講解有關(guān)使用Django-xadmin+rule對象級權(quán)限的實(shí)現(xiàn)案例,小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。

1. 需求vs現(xiàn)狀

1.1 需求

要求做一個(gè)ERP后臺輔助管理的程序,有以下幾項(xiàng)基本要求:

1. 基本的增刪改查功能

2. 基于對象的權(quán)限控制(如:系統(tǒng)用戶分為平臺運(yùn)營人員和商家用戶,商家用戶小A只能查看編輯所屬商家記錄,而管理員可以縱覽全局)

3. 數(shù)據(jù)庫記錄導(dǎo)入導(dǎo)出(xsl, json等),并且擁有對象級的權(quán)限控制(如:小A不能導(dǎo)出小B公司的信息,更不能導(dǎo)入小B公司信息進(jìn)行更新和新增)

1.2 現(xiàn)狀

實(shí)現(xiàn)需求1:Django-admin讓我們能夠很方便的實(shí)現(xiàn)一個(gè)管理后臺程序。django-xadmin則在擁有admin基本功能的基礎(chǔ)上增加了更為豐富的功能、界面也更加漂亮。類似還有django-suit等,本文使用xadmin(功能更豐富);

實(shí)現(xiàn)需求2:django-admin,以及xadmin都只有基于model級的權(quán)限控制機(jī)制,需要自己擴(kuò)展或者使用開源解決方案,如django-guardian,django-rules,本文結(jié)合django-rules實(shí)現(xiàn)了該功能;

實(shí)現(xiàn)需求3:xadmin雖然自帶導(dǎo)出功能,但是導(dǎo)入功能沒有實(shí)現(xiàn),django自帶后臺結(jié)合django-import-export可以很容易實(shí)現(xiàn),但是xadmin并不直接兼容,只有通過xadmin的插件機(jī)制實(shí)現(xiàn)。

2. 功能實(shí)現(xiàn)

本節(jié)主要展示對象級權(quán)限功能實(shí)現(xiàn)。django工程、xadmin替換原生admin的設(shè)置,請參照官方文檔。

2.1 安裝并配置rules

pip安裝:pip install django-rules

配置settings.py

# settings.py
INSTALLED_APPS = (
  # ...
  'rules',
)
AUTHENTICATION_BACKENDS = (
  'rules.permissions.ObjectPermissionBackend',
  'django.contrib.auth.backends.ModelBackend',
)

2.2 建立model

新增CompanyUser模型表示商家賬戶(即對django自帶user模塊進(jìn)行擴(kuò)展,使每個(gè)賬號綁定自己的公司碼),新增Customer模型表示商家的客戶信息并包含公司碼字段,商家賬號只能查看、編輯、導(dǎo)入、導(dǎo)出公司碼一致的商家客戶信息

# model.py
class CompanyUser(models.Model):
  user = models.OneToOneField(User, verbose_name='用戶名')
  is_taixiang_admin = models.BooleanField('是否運(yùn)營人員', default=False)
  company_code = models.CharField('公司碼', max_length=20, blank=True, default='')

  def __unicode__(self):
    return '%s' % self.user

  class Meta:
    verbose_name = '導(dǎo)入賬號'
    verbose_name_plural = verbose_name

class Customer(models.Model):
  name = models.CharField('客戶姓名', max_length=50)
  phone = models.CharField('客戶電話', max_length=12)
  type_choice = ((1, '普通'), (2, '批發(fā)'), (3, 'VIP'))
  creator = models.ForeignKey(settings.AUTH_USER_MODEL, verbose_name='創(chuàng)建人', blank=True, null=True)
  company_code = models.CharField('公司碼', max_length=20, blank=True, null=True)

  def __unicode__(self):
    return '%s-%s-%s' % (self.company_code, self.name, self.phone1)

  class Meta:
    permissions = (
      ("simulate_import_customer", "允許模擬導(dǎo)入客戶"),
      ("import_customer", "允許導(dǎo)入客戶至商家系統(tǒng)"),
            )
    verbose_name = "客戶"
    verbose_name_plural = verbose_name

2.2 使用rule

在model統(tǒng)計(jì)目錄新增rules.py,配置該app相關(guān)的對象權(quán)限

引用rules

# rules.py
# On Python 2, you must also add the following to the top of your rules.py file, or you'll get import errors trying to import django-rules itself
from __future__ import absolute_import

import rules

# 使用修飾符@rules.predicate自定義predicates(判斷),返回True表示有權(quán)限,F(xiàn)alse表示無權(quán)限

# Predicates

@rules.predicate
def is_colleague(user, entry):
  if not entry or not hasattr(user, 'companyuser'):
    return False
  return entry.company_code == user.companyuser.company_code


@rules.predicate
def is_taixiang_admin(user):
  if not hasattr(user, 'companyuser'):
    return False
  return user.companyuser.is_taixiang_admin

# predicates間可以進(jìn)行運(yùn)算
is_colleague_or_taixiang_admin = is_colleague | is_taixiang_admin | rules.is_superuser

# 設(shè)置Rules

rules.add_rule('can_view_customer', is_colleague_or_taixiang_admin)
rules.add_rule('can_delete_customer', is_colleague_or_taixiang_admin)
rules.add_perm('can_change_customer', is_colleague_or_taixiang_admin)

# 設(shè)置Permissions

rules.add_perm('data_import.view_customer', is_colleague_or_taixiang_admin)
rules.add_perm('data_import.delete_customer', is_colleague_or_taixiang_admin)
rules.add_perm('data_import.add_customer', is_colleague_or_taixiang_admin)
rules.add_perm('data_import.change_customer', is_colleague_or_taixiang_admin)

2.3 admin.py以及adminx.py設(shè)置

如果使用原生的django-admin,admin.py做如下設(shè)置:

# admin.py
from __future__ import absolute_import

from django.contrib import admin
from rules.contrib.admin import ObjectPermissionsModelAdmin
from .models import Customer

# ModelAdmin class繼承ObjectPermissionsModelAdmin即可
class CustomerAdmin(ObjectPermissionsModelAdmin):
  pass

admin.site.register(Customer, CustomerAdmin)

使用xadmin,由于ObjectPermissionsModelAdmin無法直接使用,故參照源碼重寫has_change_permission和has_delete_permission方法即可。

注意:必須引用rules文件,權(quán)限規(guī)則才會生效,對于xadmin,添加

from .rules import *即可

# adminx.py
class CustomerAdmin(object):
  def has_change_permission(self, obj=None):
    codename = get_permission_codename('change', self.opts)
    return self.user.has_perm('%s.%s' % (self.app_label, codename), obj)

  def has_delete_permission(self, obj=None):
    codename = get_permission_codename('delete', self.opts)
    return self.user.has_perm('%s.%s' % (self.app_label, codename), obj)

  # 重寫queryset()或者get_list_display(),list view的權(quán)限也做到了對象級隔離
  def queryset(self):
    qs = super(CustomerAdmin, self).queryset()
    if self.request.user.is_superuser or is_taixiang_admin(self.request.user):
      return qs
    try:
      return qs.filter(company_code=self.request.user.companyuser.company_code)
    except AttributeError:
      return None

class CompanyUserAdmin(object):
  pass

xadmin.sites.site.register(Customer, CustomerAdmin)
xadmin.sites.site.register(CompanyUser, CompanyUserAdmin)

2.4 效果展示

CompanyUser設(shè)置:

使用Django-xadmin+rule對象級權(quán)限的實(shí)現(xiàn)案例

商家賬號只有所屬公司信息權(quán)限

使用Django-xadmin+rule對象級權(quán)限的實(shí)現(xiàn)案例

運(yùn)營人員擁有所有記錄權(quán)限

使用Django-xadmin+rule對象級權(quán)限的實(shí)現(xiàn)案例

補(bǔ)充知識:django 擴(kuò)展自帶權(quán)限,使其支持對象權(quán)限

擴(kuò)展django 自帶權(quán)限

說明

在不重寫 自帶權(quán)限的基礎(chǔ)上,完成支持對象權(quán)限,適用于小型項(xiàng)目。

歡迎提出修改意見

軟件支持

jsonfield

數(shù)據(jù)庫

新建3個(gè)表

from django.db import models
from django.contrib.auth.models import AbstractUser, Group ,User
 
from jsonfield import JSONField
 
class Request(models.Model):
  request = models.CharField(max_length=16, verbose_name='請求類型(大寫)')
 
  class Meta:
    db_table = "request"
    verbose_name = "請求類型"
    verbose_name_plural = verbose_name
 
  def __str__(self):
    return self.request
 
class RolePermission(models.Model):
  role = models.CharField(max_length=32, verbose_name='角色組')
  table = models.CharField(max_length=32, verbose_name='表名字')
  request = models.ManyToManyField(Request, verbose_name='請求', related_name='re', )
  permission = JSONField(max_length=1024, verbose_name='權(quán)限條件')
 
  class Meta:
    db_table = "role_permission"
    verbose_name = "角色組權(quán)限"
    verbose_name_plural = verbose_name
 
  def __str__(self):
    return self.role
 
class Role(models.Model):
  group = models.ForeignKey(Group, verbose_name='用戶組', on_delete=models.CASCADE)
  roles = models.ManyToManyField(RolePermission, verbose_name='角色組權(quán)限', blank=True,related_name='roles' )
 
  class Meta:
    db_table = "role"
    verbose_name = "角色組關(guān)系"
    verbose_name_plural = verbose_name
 
  def __str__(self):
    return self.group.name
system/models
Role         角色組關(guān)系  : 系統(tǒng)用戶組 <--> 角色組權(quán)限
Request       請求類型   : GET ,POST
RolePermission   角色組權(quán)限  : 角色 表名字 請求 權(quán)限條件(JSON類型)

重點(diǎn)為 RolePermission 表。

例子

以常見的資產(chǎn) asset 為例

表名字 asset   字段 groups   (分組 為 dev,ops)
權(quán)限劃分
新建用戶 hequan
新建組 dev

在Request 表 添加

GET (代表只讀)
POST (代表更新 刪除)

在RolePermission 添加

角色 asset-dev只讀
表名字assset
請求 GET
權(quán)限條件 {"groups":'dev'}

在Role 表中 添加

系統(tǒng)用戶組 dev
角色組權(quán)限 asset-dev只讀

權(quán)限驗(yàn)證代碼

import json
from system.models import Role
from functools import wraps
from django.shortcuts import HttpResponse
 
def role_permission_get_list(function):
  """
  列表頁面 控制權(quán)限
  :param function:
  :return:
  """
 
  @wraps(function)
  def wrapped(self):
    user = self.request.user
    groups = [x['name'] for x in self.request.user.groups.values()]
    request_type = self.request.method
    model = str(self.model._meta).split(".")[1]
 
    filter_dict = {}
    not_list = ['page', 'order_by', 'csrfmiddlewaretoken']
    for k, v in dict(self.request.GET).items():
      if [i for i in v if i != ''] and (k not in not_list):
        if '__in' in k:
          filter_dict[k] = v
        else:
          filter_dict[k] = v[0]
 
    if not user.is_superuser:
      role_groups = Role.objects.filter(group__name__in=groups).values_list('roles__table',
                                         'roles__request__request',
                                         'roles__permission')
 
      permission_dict = {}
      for i in role_groups:
        if i[0] == model and i[1] == request_type:
          permission_dict = json.loads(i[2])
 
      if permission_dict:
        if filter_dict:
          for k, v in permission_dict.items():
            if '__in' in k:
              k1 = k.replace('__in', '')
            if '__gt' in k:
              k1 = k.replace('__gt', '')
            if '__lt' in k:
              k1 = k.replace('__lt', '')
            else:
              k1 = k
            if k1 in list(filter_dict.keys()):
              del filter_dict[k1]
 
          if filter_dict:
            filter_dict.update(**permission_dict)
          else:
            print('查詢條件處理后為空,默認(rèn)權(quán)限')
            filter_dict = permission_dict
        else:
          print('查詢條件為空,默認(rèn)權(quán)限')
          filter_dict = permission_dict
      else:
        print('沒有權(quán)限')
        filter_dict = {'id': -1}
 
    self.filter_dict = filter_dict
    result = function(self)
    return result
 
  return wrapped
 
def role_permission_detail(function):
  """
  詳情頁面 控制權(quán)限
  :param function:
  :return:
  """
 
  @wraps(function)
  def wrapped(self, request, *args, **kwargs):
    user = self.request.user
 
    if not user.is_superuser:
      groups = [x['name'] for x in self.request.user.groups.values()]
      request_type = self.request.method
      model = str(self.model._meta).split(".")[1]
      pk = self.kwargs.get(self.pk_url_kwarg, None)
 
      role_groups = Role.objects.filter(group__name__in=groups).values_list('roles__table',
                                         'roles__request__request',
                                         'roles__permission')
 
      permission_dict = {}
      for i in role_groups:
        if i[0] == model and i[1] == request_type:
          permission_dict = json.loads(i[2])
 
      permission_dict['id'] = pk
      obj = self.model.objects.filter(**permission_dict).count()
      if not obj:
        return HttpResponse(status=403)
 
    result = function(self, request, *args, **kwargs)
    return result
 
  return wrapped
 
def role_permission_update_delete(function):
  """
  詳情頁面 控制權(quán)限
  :param function:
  :return:
  """
 
  @wraps(function)
  def wrapped(self, request):
    user = self.request.user
    if not user.is_superuser:
 
      groups = [x['name'] for x in self.request.user.groups.values()]
      request_type = self.request.method
      model = str(self.model._meta).split(".")[1]
      pk = self.request.POST.get('nid', None)
 
      role_groups = Role.objects.filter(group__name__in=groups).values_list('roles__table',
                                         'roles__request__request',
                                         'roles__permission')
 
      permission_dict = {}
      for i in role_groups:
        if i[0] == model and i[1] == request_type:
          permission_dict = json.loads(i[2])
 
      permission_dict['id'] = pk
      obj = self.model.objects.filter(**permission_dict).count()
      if not obj:
        ret = {'status': None, 'error': "沒有權(quán)限,拒絕", 'msg': 'Without permission, rejected'}
        return HttpResponse(json.dumps(ret))
 
    result = function(self, request)
    return result
 
  return wrapped

CBV 例子

省略部分代碼

class AssetListAll(LoginRequiredMixin, ListView):
  model = Ecs
 
  @role_permission_get_list
  def get_queryset(self):
    filter_dict = self.filter_dict
    self.queryset = self.model.objects.filter(**filter_dict)
    return self.queryset
class AssetChange(LoginRequiredMixin, UpdateView):
  model = Ecs
 
  @role_permission_detail
  def dispatch(self, request, *args, **kwargs):
    return super().dispatch(request, *args, **kwargs)
 
  @role_permission_update_delete
  def form_valid(self, form):
    self.object = form.save()
    return super().form_valid(form)
class AssetDetail(LoginRequiredMixin, DetailView):
  model = Ecs
 
  @role_permission_detail
  def dispatch(self, request, *args, **kwargs):
    return super().dispatch(request, *args, **kwargs)
class AssetDel(LoginRequiredMixin, View):
  model = Ecs
 
  @role_permission_update_delete
  def post(self, request):
    pass

關(guān)于“使用Django-xadmin+rule對象級權(quán)限的實(shí)現(xiàn)案例”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學(xué)到更多知識,如果覺得文章不錯(cuò),請把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI