溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Django+Vue實(shí)現(xiàn)WebSocket連接

發(fā)布時(shí)間:2020-06-22 07:12:33 來(lái)源:網(wǎng)絡(luò) 閱讀:8355 作者:迷失的貓妖 欄目:開(kāi)發(fā)技術(shù)

近期有一需求:前端頁(yè)面點(diǎn)擊執(zhí)行任務(wù),實(shí)時(shí)顯示后端執(zhí)行情況,思考一波;發(fā)現(xiàn)WebSocket最適合做這件事。

效果

測(cè)試ping www.baidu.com效果

點(diǎn)擊連接建立ws連接

Django+Vue實(shí)現(xiàn)WebSocket連接

后端實(shí)現(xiàn)

所需軟件包

后端主要借助DjangoChannels實(shí)現(xiàn)socket連接,官網(wǎng)文檔鏈接

這里想實(shí)現(xiàn)每個(gè)連接進(jìn)來(lái)加入組進(jìn)行廣播,所以還需要引入channels-redis。

pip

channels==2.2.0
channels-redis==2.4.0

引入

settings.py

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'rest_framework.authtoken',
    'rest_framework',
                ...
    'channels',
]

# Redis配置
REDIS_HOST = ENV_DICT.get('REDIS_HOST', '127.0.0.1')
REDIS_PORT = ENV_DICT.get('REDIS_PORT', 6379)
CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [(REDIS_HOST, REDIS_PORT)],
        },
    },
}

代碼

apps/consumers.py

新建一個(gè)消費(fèi)處理

實(shí)現(xiàn): 默認(rèn)連接加入組,發(fā)送信息時(shí)的處理。

from channels.generic.websocket import WebsocketConsumer

class MyConsumer(WebsocketConsumer):
    def connect(self):
        """
        每個(gè)任務(wù)作為一個(gè)頻道
        默認(rèn)進(jìn)入對(duì)應(yīng)任務(wù)執(zhí)行頻道
        """
        self.job_name = self.scope['url_route']['kwargs']['job_name']
        self.job_group_name = 'job_%s' % self.job_name
        async_to_sync(self.channel_layer.group_add)(
            self.job_group_name,
            self.channel_name
        )
        self.accept()

    def disconnect(self, close_code):
        async_to_sync(self.channel_layer.group_discard)(
            self.job_group_name,
            self.channel_name
        )

    # job.message類型處理
    def job_message(self, event):

        # 默認(rèn)發(fā)送收到信息
        self.send(text_data=event["text"])

apps/routing.py

ws類型路由

實(shí)現(xiàn):ws/job/<job_name>由MyConsumer去處理。

from . import consumers
from django.urls import path
from channels.routing import ProtocolTypeRouter,  URLRouter
from channels.sessions import SessionMiddlewareStack

application = ProtocolTypeRouter({
    'websocket': SessionMiddlewareStack(
        URLRouter(
          [
              path('ws/job/<str:job_name>', consumers.MyConsumer)
          ]
        )
    ),
})

apps/views.py

在執(zhí)行命令中獲取webSocket消費(fèi)通道,進(jìn)行異步推送

  • 使用異步推送async_to_sync是因?yàn)樵谶B接的時(shí)候采用的異步連接,所以推送必須采用異步推送。
  • 因?yàn)閳?zhí)行任務(wù)時(shí)間過(guò)長(zhǎng),啟動(dòng)觸發(fā)運(yùn)行時(shí)加入多線程,直接先返回ok,后端運(yùn)行任務(wù)。
from subprocess  import Popen,PIPE
import threading

def runPopen(job):
    """
    執(zhí)行命令,返回popen
    """
    path = os.path
    Path = path.abspath(path.join(BASE_DIR, path.pardir))
    script_path = path.abspath(path.join(Path,'run.sh'))
    cmd = "sh %s %s" % (script_path, job)
    return Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE)

def runScript(job):
    channel_layer = get_channel_layer()
    group_name = "job_%s" % job

    popen = runPopen(job)
    while True:
        output = popen.stdout.readline()
        if output == '' and popen.poll() is not None:
            break

        if output:
            output_text = str(output.strip())
            async_to_sync(
                channel_layer.group_send
                )(
                    group_name, 
                    {"type": "job.message", "text": output_text}
                )
        else:
            err  = popen.stderr.readline()
            err_text = str(err.strip())
            async_to_sync(
                channel_layer.group_send
                )(
                    group_name,
                    {"type": "job.message", "text": err_text}
                )
            break

class StartJob(APIView):  
    def get(self, request, job=None):
        run =  threading.Thread(target=runScript, args=(job,))
        run.start()
        return HttpResponse('ok')

apps/urls.py

get請(qǐng)求就能啟動(dòng)任務(wù)

urlpatterns = [
                ...
    path('start_job/<str:job>', StartJob.as_view())
]

前端實(shí)現(xiàn)

所需軟件包

vue-native-websocket 

代碼實(shí)現(xiàn)

plugins/vueNativeWebsocket.js

import Vue from 'vue'
import VueNativeSock from '../utils/socket/Main.js'

export default function ({ store }) {
  Vue.use(VueNativeSock, 'http://localhost:8000/ws/job', {connectManually: true,});
}

nuxt.config.js

配置文件引入, 這里我使用的是nuxt框架

  plugins: [ 
      { 
        src: '@/plugins/vueNativeWebsocket.js', 
        ***: false 
      },
    ],

封裝socket

export default (connection_url, option) => {
    // 事件
    let event = ['message', 'close', 'error', 'open'];

    // 拷貝選項(xiàng)字典
    let opts = Object.assign({}, option);

    // 定義實(shí)例字典
    let instance = {

      // socket實(shí)例
      socket: '',

      // 是否連接狀態(tài)
      is_conncet: false,

      // 具體連接方法
      connect: function() {
        if(connection_url) {
          let scheme = window.location.protocol === 'https:' ? 'wss' : 'ws'
          connection_url = scheme + '://' + connection_url.split('://')[1];
          this.socket = new WebSocket(connection_url);
          this.initEvent();
        }else{
          console.log('wsurl為空');
        }
      },

      // 初始化事件
      initEvent: function() {
        for(let i = 0; i < event.length; i++){
          this.addListener(event[i]);
        }
      },

      // 判斷事件
      addListener: function(event) {
        this.socket.addEventListener(event, (e) => {
          switch(event){
            case 'open':
              this.is_conncet = true;
              break;
            case 'close':
              this.is_conncet = false;
              break;
          }
          typeof opts[event] == 'function' && opts[event](e);
        });
      },

      // 發(fā)送方法,失敗則回調(diào)
      send: function(data, closeCallback) {
        console.log('socket ---> ' + data)
        if(this.socket.readyState >= 2) {
          console.log('ws已經(jīng)關(guān)閉');
          closeCallback && closeCallback();
        }else{
          this.socket.send(data);
        }
      }

    };

    // 調(diào)用連接方法
    instance.connect();
    return instance;
  }

index.vue

具體代碼

  • x2Str方法,因?yàn)楹蠖朔祷氐氖莃ytes,格式b'xxx',編寫了方法對(duì)其進(jìn)行轉(zhuǎn)換。
<template>
        <div>

                <el-button type="primary" @click="runFunction" >執(zhí)行</el-button>
                <el-button type="primary"  @click="connectWebSock" >顯示</el-button>

    <div class="socketView">
      <span v-for="i in socketMessage" :key="i">{{i}}</span>
    </div>
  </div>
</template>
<script>
  import R from '@/plugins/axios';
  import ws from '@/plugins/socket'
  export default {
    data() {
      return {
        webSocket: '',
        socketMessage: [],
      }
    },

        methods: {
          // 打開(kāi)連接的處理
      openSocket(e) {
        if (e.isTrusted) {
          const h = this.$createElement;
          this.$notify({
            title: '提示',
            message: h('i', { style: 'color: teal'}, '已建立Socket連接')
          });
        }
      },

    // 連接時(shí)的處理
    listenSocket(e) {
      if (e.data){
        this.socketMessage.push(this.x2Str(e.data))
      }
    },

    // 連接webSocket
                connectWebSock() {
      let wsuri = process.env.BACKEND_URL + '/ws/job/' + this.selectFunctions
      this.webSocket = ws(wsuri, {
        open: e => this.openSocket(e),
        message: e => this.listenSocket(e),
        close: e => this.closeSocket(e)
      })
    },

          // 轉(zhuǎn)碼
    x2Str(str) {
      if (str) {
        let reg = new RegExp("(?<=^b').*(?='$)")
        let result = str.replace(/(?:\\x[\da-fA-F]{2})+/g, m =>
          decodeURIComponent(m.replace(/\\x/g, '%'))
        )
        return reg.exec(result)[0]
      }
    },

    // 執(zhí)行方法
    runFunction() {
      R.myRequest('GET','api/start_job/' + this.selectFunctions, {}, {}).then((response) => {
        if (response.hasOwnProperty('response')){
            this.$message({
            type: 'error',
            message: '服務(wù)端返回錯(cuò)誤,返回碼:' + response.response.status 
            });
        }; 
        if (response.data == 'ok') {
            this.$message({
              type: 'success',
              message: '開(kāi)始執(zhí)行[' + this.selectFunctions + ']'
            });
        }
      });
    }      
    }
}
</script>

至此,實(shí)現(xiàn)前后端websocket通訊。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI