溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

使用python實(shí)現(xiàn)簡單聊天室功能的案例

發(fā)布時(shí)間:2021-04-07 09:46:57 來源:億速云 閱讀:288 作者:小新 欄目:開發(fā)技術(shù)

這篇文章給大家分享的是有關(guān)使用python實(shí)現(xiàn)簡單聊天室功能的案例的內(nèi)容。小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,一起跟隨小編過來看看吧。

具體內(nèi)容如下

公共模塊

首先寫一個(gè)公共類,用字典的形式對(duì)數(shù)據(jù)的收發(fā),并且進(jìn)行封裝,導(dǎo)入struct解決了TCP的粘包問題,并在公共類中進(jìn)行了異常處理

import socket,struct,json
def send_dic(c,dic):
 dic_json=json.dumps(dic)
 dic_json_length=len(dic_json.encode('utf-8'))
 struct_dic_json_length=struct.pack('q',dic_json_length)
 c.send(struct_dic_json_length)
 c.send(dic_json.encode('utf-8'))
def get_dic(c):
 try:
  dic_length=struct.unpack('q',c.recv(8))[0]
 except:
  return {'msg':'exit'}
 try:
  dic_json=c.recv(dic_length).decode('utf-8')
 except:
  return {'msg':'exit'}
 dic=json.loads(dic_json)
 return dic

服務(wù)器

import socket
from concurrent.futures import ThreadPoolExecutor
import lib.common #導(dǎo)入寫在lib里面的公共模塊,代碼在上面
import re
#進(jìn)行開啟服務(wù)器等一系列操作
s=socket.socket()
ip_host=('127.0.0.1',8000)
s.bind(ip_host)
s.listen()
#創(chuàng)建一個(gè)列表,用來保存客戶端及其信息
c_list=[]
def get_send_msg(c,addr,c_list):
 while True:
  tag=False
  dic=lib.common.get_dic(c)
  if dic['msg']=='exit':
   #如果接受出異常,或是客戶端主動(dòng)輸入為exit,在列表中移除客戶端信息
   for i in c_list:
    if i['addr']==addr:
     c_list.remove(i)
   break
  if dic['is_siliao']==True:
   #客戶端發(fā)來的字典里面如果is_siliao==True,進(jìn)入私聊代碼
   for i in c_list:
    #遍歷列表,并用正則表達(dá)式截取信息
    li=re.findall('(.*?)@%s(.*)'%i['name'],dic['msg'])
    if len(li)!=0:
     dic['msg']=li[0][0]+li[0][1]
     lib.common.send_dic(i['client'],dic)
     tag=True
     break
  if tag:
   continue
  #如果不是私聊,進(jìn)入下面代碼,在聊天室進(jìn)行群聊
  for i in c_list:
   if i['addr']!=addr:
    lib.common.send_dic(i['client'],dic)
while True:
 #用線程池,進(jìn)行多次連接
 print('客戶端等待連接')
 c,addr=s.accept()
 print('%s連接了服務(wù)器'%addr[1])
 name=c.recv(1024).decode('utf-8')#進(jìn)行第一次接受,接受客戶端的名字,為私聊的功能做準(zhǔn)備
 c_dic={'addr':addr,'client':c,'name':name}#將客戶端的信息保存在字典中
 c_list.append(c_dic)#將字典加入列表
 t=ThreadPoolExecutor()
 t.submit(get_send_msg,c,addr,c_list)

客戶端:

import lib.common
from concurrent.futures import ThreadPoolExecutor
c=socket.socket()
ip_host=('127.0.0.1',8000)
c.connect(ip_host)
def send_msg(c,name):
 while True:
  msg = input ('>>:').strip ()
  is_siliao=False
  if not msg:
   continue
  # if msg.startswith('@'):
  if '@'in msg:
   is_siliao=True
  dic = {'msg': msg,'name':name,'is_siliao':is_siliao}
  lib.common.send_dic(c,dic)
  if msg=='exit':
   c.close ()
   break
def get_msg(c):
 while True:
  dic=lib.common.get_dic(c)
  if dic['is_siliao']==True:
   print('來自%s的私聊:%s'%(dic['name'],dic['msg']))
   continue
  print('%s:%s'%(dic['name'],dic['msg']))
t=ThreadPoolExecutor()
name=input('你的聊天名字:').strip()
c.send(name.encode('utf-8'))
t.submit(send_msg,c,name)
t.submit(get_msg,c)

運(yùn)行代碼截圖:

使用python實(shí)現(xiàn)簡單聊天室功能的案例

使用python實(shí)現(xiàn)簡單聊天室功能的案例

使用python實(shí)現(xiàn)簡單聊天室功能的案例

使用python實(shí)現(xiàn)簡單聊天室功能的案例

使用python實(shí)現(xiàn)簡單聊天室功能的案例

感謝各位的閱讀!關(guān)于“使用python實(shí)現(xiàn)簡單聊天室功能的案例”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,讓大家可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到吧!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI