溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Django+Vue如何實現(xiàn)WebSocket連接

發(fā)布時間:2021-04-02 11:04:48 來源:億速云 閱讀:513 作者:小新 欄目:web開發(fā)

這篇文章給大家分享的是有關(guān)Django+Vue如何實現(xiàn)WebSocket連接的內(nèi)容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。

效果

測試 ping www.baidu.com 效果

點擊連接建立ws連接

Django+Vue如何實現(xiàn)WebSocket連接

后端實現(xiàn)

所需軟件包

后端主要借助Django Channels 實現(xiàn)socket連接,官網(wǎng)文檔鏈接

這里想實現(xiàn)每個連接進(jìn)來加入組進(jìn)行廣播,所以還需要引入 channels-redis 。

pip

channels==2.2.0
channels-redis==2.4.0

引入

settings.py

INSTALLED_APPS = [
  'django.contrib.admin',
  'django.contrib.auth',
  'django.contrib.contenttypes',
  'django.contrib.sessions',
  'django.contrib.messages',
  'django.contrib.staticfiles',
  'rest_framework.authtoken',
  'rest_framework',
        ...
  'channels',
]

# Redis配置
REDIS_HOST = ENV_DICT.get('REDIS_HOST', '127.0.0.1')
REDIS_PORT = ENV_DICT.get('REDIS_PORT', 6379)
CHANNEL_LAYERS = {
  "default": {
    "BACKEND": "channels_redis.core.RedisChannelLayer",
    "CONFIG": {
      "hosts": [(REDIS_HOST, REDIS_PORT)],
    },
  },
}

代碼

apps/consumers.py

新建一個消費處理

實現(xiàn): 默認(rèn)連接加入組,發(fā)送信息時的處理。

from channels.generic.websocket import WebsocketConsumer

class MyConsumer(WebsocketConsumer):
  def connect(self):
    """
    每個任務(wù)作為一個頻道
    默認(rèn)進(jìn)入對應(yīng)任務(wù)執(zhí)行頻道
    """
    self.job_name = self.scope['url_route']['kwargs']['job_name']
    self.job_group_name = 'job_%s' % self.job_name
    async_to_sync(self.channel_layer.group_add)(
      self.job_group_name,
      self.channel_name
    )
    self.accept()

  def disconnect(self, close_code):
    async_to_sync(self.channel_layer.group_discard)(
      self.job_group_name,
      self.channel_name
    )

  # job.message類型處理
  def job_message(self, event):

    # 默認(rèn)發(fā)送收到信息
    self.send(text_data=event["text"])

apps/routing.py

ws類型路由

實現(xiàn):ws/job/<job_name>由 MyConsumer 去處理。

from . import consumers
from django.urls import path
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.sessions import SessionMiddlewareStack

application = ProtocolTypeRouter({
  'websocket': SessionMiddlewareStack(
    URLRouter(
     [
       path('ws/job/<str:job_name>', consumers.MyConsumer)
     ]
    )
  ),
})

apps/views.py

在執(zhí)行命令中獲取 webSocket 消費通道,進(jìn)行異步推送

  • 使用異步推送async_to_sync是因為在連接的時候采用的異步連接,所以推送必須采用異步推送。

  • 因為執(zhí)行任務(wù)時間過長,啟動觸發(fā)運(yùn)行時加入多線程,直接先返回ok,后端運(yùn)行任務(wù)。

from subprocess import Popen,PIPE
import threading

def runPopen(job):
  """
  執(zhí)行命令,返回popen
  """
  path = os.path
  Path = path.abspath(path.join(BASE_DIR, path.pardir))
  script_path = path.abspath(path.join(Path,'run.sh'))
  cmd = "sh %s %s" % (script_path, job)
  return Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE)

def runScript(job):
  channel_layer = get_channel_layer()
  group_name = "job_%s" % job

  popen = runPopen(job)
  while True:
    output = popen.stdout.readline()
    if output == '' and popen.poll() is not None:
      break

    if output:
      output_text = str(output.strip())
      async_to_sync(
        channel_layer.group_send
        )(
          group_name, 
          {"type": "job.message", "text": output_text}
        )
    else:
      err = popen.stderr.readline()
      err_text = str(err.strip())
      async_to_sync(
        channel_layer.group_send
        )(
          group_name,
          {"type": "job.message", "text": err_text}
        )
      break

class StartJob(APIView): 
  def get(self, request, job=None):
    run = threading.Thread(target=runScript, args=(job,))
    run.start()
    return HttpResponse('ok')

apps/urls.py

get請求就能啟動任務(wù)

urlpatterns = [
        ...
  path('start_job/<str:job>', StartJob.as_view())
]

前端實現(xiàn)

所需軟件包

vue-native-websocket

代碼實現(xiàn)

plugins/vueNativeWebsocket.js

import Vue from 'vue'
import VueNativeSock from '../utils/socket/Main.js'

export default function ({ store }) {
 Vue.use(VueNativeSock, 'http://localhost:8000/ws/job', {connectManually: true,});
}

nuxt.config.js

配置文件引入, 這里我使用的是 nuxt 框架

 plugins: [ 
   { 
    src: '@/plugins/vueNativeWebsocket.js', 
    ***: false 
   },
  ],

封裝 socket

export default (connection_url, option) => {
  // 事件
  let event = ['message', 'close', 'error', 'open'];

  // 拷貝選項字典
  let opts = Object.assign({}, option);

  // 定義實例字典
  let instance = {

   // socket實例
   socket: '',

   // 是否連接狀態(tài)
   is_conncet: false,

   // 具體連接方法
   connect: function() {
    if(connection_url) {
     let scheme = window.location.protocol === 'https:' ? 'wss' : 'ws'
     connection_url = scheme + '://' + connection_url.split('://')[1];
     this.socket = new WebSocket(connection_url);
     this.initEvent();
    }else{
     console.log('wsurl為空');
    }
   },

   // 初始化事件
   initEvent: function() {
    for(let i = 0; i < event.length; i++){
     this.addListener(event[i]);
    }
   },

   // 判斷事件
   addListener: function(event) {
    this.socket.addEventListener(event, (e) => {
     switch(event){
      case 'open':
       this.is_conncet = true;
       break;
      case 'close':
       this.is_conncet = false;
       break;
     }
     typeof opts[event] == 'function' && opts[event](e);
    });
   },

   // 發(fā)送方法,失敗則回調(diào)
   send: function(data, closeCallback) {
    console.log('socket ---> ' + data)
    if(this.socket.readyState >= 2) {
     console.log('ws已經(jīng)關(guān)閉');
     closeCallback && closeCallback();
    }else{
     this.socket.send(data);
    }
   }

  };

  // 調(diào)用連接方法
  instance.connect();
  return instance;
 }

index.vue

具體代碼

x2Str 方法,因為后端返回的是bytes,格式 b'xxx' ,編寫了方法對其進(jìn)行轉(zhuǎn)換。

<template>
    <div>

        <el-button type="primary" @click="runFunction" >執(zhí)行</el-button>
        <el-button type="primary" @click="connectWebSock" >顯示</el-button>

  <div class="socketView">
   <span v-for="i in socketMessage" :key="i">{{i}}</span>
  </div>
 </div>
</template>
<script>
 import R from '@/plugins/axios';
 import ws from '@/plugins/socket'
 export default {
  data() {
   return {
    webSocket: '',
    socketMessage: [],
   }
  },

    methods: {
     // 打開連接的處理
   openSocket(e) {
    if (e.isTrusted) {
     const h = this.$createElement;
     this.$notify({
      title: '提示',
      message: h('i', { style: 'color: teal'}, '已建立Socket連接')
     });
    }
   },

  // 連接時的處理
  listenSocket(e) {
   if (e.data){
    this.socketMessage.push(this.x2Str(e.data))
   }
  },

  // 連接webSocket
        connectWebSock() {
   let wsuri = process.env.BACKEND_URL + '/ws/job/' + this.selectFunctions
   this.webSocket = ws(wsuri, {
    open: e => this.openSocket(e),
    message: e => this.listenSocket(e),
    close: e => this.closeSocket(e)
   })
  },

     // 轉(zhuǎn)碼
  x2Str(str) {
   if (str) {
    let reg = new RegExp("(?<=^b').*(?='$)")
    let result = str.replace(/(?:\\x[\da-fA-F]{2})+/g, m =>
     decodeURIComponent(m.replace(/\\x/g, '%'))
    )
    return reg.exec(result)[0]
   }
  },

  // 執(zhí)行方法
  runFunction() {
   R.myRequest('GET','api/start_job/' + this.selectFunctions, {}, {}).then((response) => {
    if (response.hasOwnProperty('response')){
      this.$message({
      type: 'error',
      message: '服務(wù)端返回錯誤,返回碼:' + response.response.status 
      });
    }; 
    if (response.data == 'ok') {
      this.$message({
       type: 'success',
       message: '開始執(zhí)行[' + this.selectFunctions + ']'
      });
    }
   });
  }   
  }
}
</script>

感謝各位的閱讀!關(guān)于“Django+Vue如何實現(xiàn)WebSocket連接”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,讓大家可以學(xué)到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI