您好,登錄后才能下訂單哦!
本文小編為大家詳細(xì)介紹“怎么用python快速搭建redis集群”,內(nèi)容詳細(xì),步驟清晰,細(xì)節(jié)處理妥當(dāng),希望這篇“怎么用python快速搭建redis集群”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學(xué)習(xí)新知識(shí)吧。
列出主要的點(diǎn),便于對(duì)于下面程序的理解。
Redis在TCP端口6379(默認(rèn)端口,在配置可以修改)上監(jiān)聽到來的連接,在客戶端與服務(wù)器端之間傳輸?shù)拿總€(gè)Redis命令或者數(shù)據(jù)都以rn結(jié)尾。
Redis用不同的回復(fù)類型回復(fù)命令。它可能從服務(wù)器發(fā)送的第一個(gè)字節(jié)開始校驗(yàn)回復(fù)類型:
* 用單行回復(fù)(狀態(tài)回復(fù)),回復(fù)的第一個(gè)字節(jié)將是“+”
* 錯(cuò)誤消息,回復(fù)的第一個(gè)字節(jié)將是“-”
* 整型數(shù)字,回復(fù)的第一個(gè)字節(jié)將是“:”
* 批量回復(fù),回復(fù)的第一個(gè)字節(jié)將是“$”
* 多個(gè)批量回復(fù),回復(fù)的第一個(gè)字節(jié)將是“*”
批量回復(fù)被服務(wù)器用于返回一個(gè)單二進(jìn)制安全字符串。
C: GET mykey
S: $6rnfoobarrn
服務(wù)器發(fā)送第一行回復(fù),該行以“$”開始后面跟隨實(shí)際要發(fā)送的字節(jié)數(shù),隨后是CRLF,然后發(fā)送實(shí)際數(shù)據(jù),隨后是2個(gè)字節(jié)的額外數(shù)據(jù)用于最后的CRLF。服務(wù)器發(fā)送的準(zhǔn)確序列如下:
”$6rnfoobarrn”
如果請(qǐng)求的值不存在,批量回復(fù)將使用特殊的值-1來作為數(shù)據(jù)長度,例如:
C: GET nonexistingkey
S: $-1
當(dāng)請(qǐng)求的對(duì)象不存在時(shí),客戶端庫API不會(huì)返回空字符串,而會(huì)返回空對(duì)象。例如:Ruby庫返回‘nil’,而C庫返回NULL(或者在回復(fù)的對(duì)象里設(shè)置指定的標(biāo)志)等等。
簡單說下二進(jìn)制,就是會(huì)包含,所以C語言在處理的時(shí)候,就不能用str函數(shù),像strlen、strcpy等,因?yàn)樗鼈兌际且詠砼袛嘧址Y(jié)尾的。
官網(wǎng)也介紹了怎么搭建redis集群,試過比較麻煩,因?yàn)橛玫腸entos6.5,如果用較新的centos,可能會(huì)好點(diǎn)。
Redis 集群沒有使用一致性hash, 而是引入了 哈希槽的概念.
Redis 集群有16384個(gè)哈希槽,每個(gè)key通過CRC16校驗(yàn)后對(duì)16384取模來決定放置哪個(gè)槽.集群的每個(gè)節(jié)點(diǎn)負(fù)責(zé)一部分hash槽,舉個(gè)例子,比如當(dāng)前集群有3個(gè)節(jié)點(diǎn),那么:
* 節(jié)點(diǎn) A 包含 0 到 5500號(hào)哈希槽.
* 節(jié)點(diǎn) B 包含5501 到 11000 號(hào)哈希槽.
* 節(jié)點(diǎn) C 包含11001 到 16384號(hào)哈希槽.
這種結(jié)構(gòu)很容易添加或者刪除節(jié)點(diǎn). 比如如果我想新添加個(gè)節(jié)點(diǎn)D, 我需要從節(jié)點(diǎn) A, B, C中得部分槽到D上. 如果我想移除節(jié)點(diǎn)A,需要將A中的槽移到B和C節(jié)點(diǎn)上,然后將沒有任何槽的A節(jié)點(diǎn)從集群中移除即可. 由于從一個(gè)節(jié)點(diǎn)將哈希槽移動(dòng)到另一個(gè)節(jié)點(diǎn)并不會(huì)停止服務(wù),所以無論添加刪除或者改變某個(gè)節(jié)點(diǎn)的哈希槽的數(shù)量都不會(huì)造成集群不可用的狀態(tài).
在 Redis 集群中,節(jié)點(diǎn)負(fù)責(zé)存儲(chǔ)數(shù)據(jù)、記錄集群的狀態(tài)(包括鍵值到正確節(jié)點(diǎn)的映射)。集群節(jié)點(diǎn)同樣能自動(dòng)發(fā)現(xiàn)其他節(jié)點(diǎn),檢測(cè)出沒正常工作的節(jié)點(diǎn), 并且在需要的時(shí)候在從節(jié)點(diǎn)中推選出主節(jié)點(diǎn)。
為了執(zhí)行這些任務(wù),所有的集群節(jié)點(diǎn)都通過TCP連接(TCP bus?)和一個(gè)二進(jìn)制協(xié)議(集群連接,cluster bus)建立通信。 每一個(gè)節(jié)點(diǎn)都通過集群連接(cluster bus)與集群上的其余每個(gè)節(jié)點(diǎn)連接起來?! 」?jié)點(diǎn)們使用一個(gè) gossip 協(xié)議來傳播集群的信息,這樣可以:發(fā)現(xiàn)新的節(jié)點(diǎn)、 發(fā)送ping包(用來確保所有節(jié)點(diǎn)都在正常工作中)、在特定情況發(fā)生時(shí)發(fā)送集群消息。集群連接也用于在集群中發(fā)布或訂閱消息。
由于集群節(jié)點(diǎn)不能代理(proxy)請(qǐng)求,所以客戶端在接收到重定向錯(cuò)誤(redirections errors) -MOVED 和 -ASK 的時(shí)候, 將命令重定向到其他節(jié)點(diǎn)。理論上來說,客戶端是可以自由地向集群中的所有節(jié)點(diǎn)發(fā)送請(qǐng)求,在需要的時(shí)候把請(qǐng)求重定向到其他節(jié)點(diǎn),所以客戶端是不需要保存集群狀態(tài)。 不過客戶端可以緩存鍵值和節(jié)點(diǎn)之間的映射關(guān)系,這樣能明顯提高命令執(zhí)行的效率。
簡單說下返回-MOVED的情況,就是客戶端連節(jié)點(diǎn)A請(qǐng)求處理key,但其實(shí)key其實(shí)在節(jié)點(diǎn)B,就返回-MOVED,協(xié)議如下:-MOVED 3999 127.0.0.1:6381
不用考慮-ASK的情況。
代碼如下:
#include <string.h>#include <sys/socket.h>#include <arpa/inet.h>#include <errno.h>#include <fcntl.h>#include <netdb.h>#include <sys/poll.h>#include <unistd.h>#include <sys/types.h>#include <stdlib.h>#include <stdio.h>ssize_t sock_write_loop( int fd, const void *vptr, size_t n ) { size_t nleft = 0; ssize_t nwritten = 0;const char *ptr; ptr = (char *) vptr; nleft = n;while( nleft > 0 ) {if( (nwritten = write(fd, ptr, nleft) ) <= 0 ) {if( errno == EINTR ) { nwritten = 0; //再次調(diào)用write }else{return -5; } } nleft = nleft - nwritten; ptr = ptr + nwritten; }return(n); }int sock_read_wait( int fd, int timeout ) {struct pollfd pfd; pfd.fd = fd; pfd.events = POLLIN; pfd.revents = 0; timeout *= 1000;for (;;) {switch( poll(&pfd, 1, timeout) ) {case -1:if( errno != EINTR ) {return (-2); }continue;case 0: errno = ETIMEDOUT;return (-1);default:if( pfd.revents & POLLIN )return (0);elsereturn (-3); } } } ssize_t sock_read_tmo( int fd, void *vptr, size_t len, int timeout ) { if( timeout > 0 && sock_read_wait(fd, timeout) < 0 )return (-1);elsereturn (read(fd, vptr, len)); }int sock_connect_nore(const char *IPaddr , int port , int timeout) { // char temp[4096];int sock_fd = 0, n = 0, errcode = 0;struct sockaddr_in servaddr;if( IPaddr == NULL ) {return -1; }if( (sock_fd = socket(AF_INET, SOCK_STREAM, 0) ) < 0 ) {return -1; } memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(port);//changed by navy 2003.3.3 for support domain addr//if( (servaddr.sin_addr.s_addr = inet_addr(IPaddr) ) == -1 )if( (errcode = inet_pton(AF_INET, IPaddr, &servaddr.sin_addr) ) <= 0 ) {//added by navy 2003.3.31 for support domain addrstruct hostent* pHost = NULL, host;char sBuf[2048], sHostIp[17];int h_errnop = 0; memset(&host, 0, sizeof(host)); memset(sBuf, 0, sizeof(sBuf)); memset(sHostIp, 0 , sizeof(sHostIp)); pHost = &host; #ifdef _SOLARIS_PLAT//solarisif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &h_errnop) == NULL) || #else//linuxif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &pHost, &h_errnop) != 0) || #endif(pHost == NULL) ) { close(sock_fd);return -1; }if( pHost->h_addrtype != AF_INET && pHost->h_addrtype != AF_INET6 ) { close(sock_fd);return -1; }//目前僅取第一個(gè)IP地址if( (inet_ntop(pHost->h_addrtype, *(pHost->h_addr_list), sHostIp, sizeof(sHostIp)) ) == NULL ) { close(sock_fd);return -1; } if( (errcode = inet_pton(AF_INET, sHostIp, &servaddr.sin_addr) ) <= 0 ) { close(sock_fd); return -1; }//end added by navy 2003.3.31 }if( (errcode = sock_timed_connect(sock_fd, (struct sockaddr *)&servaddr, sizeof(servaddr), timeout) ) < 0 ) { close(sock_fd); return -1; }return sock_fd; }int sock_connect(const char *IPaddr , int port , int timeout) {char temp[4096];int sock_fd = 0, n = 0, errcode = 0;struct sockaddr_in servaddr;if( IPaddr == NULL ) {return -1; }if( (sock_fd = socket(AF_INET, SOCK_STREAM, 0) ) < 0 ) {return -1; } memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(port);//changed by navy 2003.3.3 for support domain addr//if( (servaddr.sin_addr.s_addr = inet_addr(IPaddr) ) == -1 )if( (errcode = inet_pton(AF_INET, IPaddr, &servaddr.sin_addr) ) <= 0 ) {//added by navy 2003.3.31 for support domain addrstruct hostent* pHost = NULL, host;char sBuf[2048], sHostIp[17];int h_errnop = 0; memset(&host, 0, sizeof(host)); memset(sBuf, 0, sizeof(sBuf)); memset(sHostIp, 0 , sizeof(sHostIp)); pHost = &host; #ifdef _SOLARIS_PLAT//solarisif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &h_errnop) == NULL) || #else//linuxif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &pHost, &h_errnop) != 0) || #endif(pHost == NULL) ) { close(sock_fd);return -1; }if( pHost->h_addrtype != AF_INET && pHost->h_addrtype != AF_INET6 ) { close(sock_fd);return -1; }//目前僅取第一個(gè)IP地址if( (inet_ntop(pHost->h_addrtype, *(pHost->h_addr_list), sHostIp, sizeof(sHostIp)) ) == NULL ) { close(sock_fd);return -1; } if( (errcode = inet_pton(AF_INET, sHostIp, &servaddr.sin_addr) ) <= 0 ) { close(sock_fd); return -1; }//end added by navy 2003.3.31 }if( (errcode = sock_timed_connect(sock_fd, (struct sockaddr *)&servaddr, sizeof(servaddr), timeout) ) < 0 ) { close(sock_fd); return -1; } n = sock_read_tmo(sock_fd, temp, 4096, timeout);//一般錯(cuò)誤if( n <= 0 ) { close(sock_fd); sock_fd = -1; }return sock_fd; }int sock_non_blocking(int fd, int on) {int flags;if ((flags = fcntl(fd, F_GETFL, 0)) < 0){return -10; }if (fcntl(fd, F_SETFL, on ? flags | O_NONBLOCK : flags & ~O_NONBLOCK) < 0){return -10; }return 0; }int sock_write_wait(int fd, int timeout) {struct pollfd pfd; pfd.fd = fd; pfd.events = POLLOUT; pfd.revents = 0; timeout *= 1000;for (;;) {switch( poll(&pfd, 1, timeout) ) {case -1:if( errno != EINTR ) {return (-2); }continue;case 0: errno = ETIMEDOUT;return (-1);default:if( pfd.revents & POLLOUT )return (0);elsereturn (-3); } } }int sock_timed_connect(int sock, struct sockaddr * sa, int len, int timeout) {int error = 0; socklen_t error_len; sock_non_blocking(sock, 1);if( connect(sock, sa, len) == 0 ) { sock_non_blocking(sock, 0);return (0); }if( errno != EINPROGRESS ) { sock_non_blocking(sock, 0);return (-1); }/* * A connection is in progress. Wait for a limited amount of time for * something to happen. If nothing happens, report an error. */if( sock_write_wait(sock, timeout) != 0) { sock_non_blocking(sock, 0);return (-2); }/* * Something happened. Some Solaris 2 versions have getsockopt() itself * return the error, instead of returning it via the parameter list. */error = 0; error_len = sizeof(error);if( getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *) &error, &error_len) != 0 ) { sock_non_blocking(sock, 0);return (-3); }if( error ) { errno = error; sock_non_blocking(sock, 0);return (-4); } sock_non_blocking(sock, 0);/* * No problems. */return (0); }static int check_ip_in_list(const char *ip, char *iplist) { char *token = NULL;char *saveptr = NULL; token = strtok_r(iplist, ",", &saveptr);while(token != NULL) { char *ptmp = NULL; char *ip_mask = strtok_r(token, "/", &ptmp);if(!ip_mask) return -1; char *ip_bit = strtok_r(NULL, "/", &ptmp); if(ip_bit) {int mask_bit = atoi(ip_bit);if(mask_bit < 0 || mask_bit >32)continue; unsigned long addr[4] = { 0 }; sscanf( ip_mask, "%lu.%lu.%lu.%lu", addr, addr + 1, addr + 2, addr + 3 ); unsigned long vl1 = addr[0] << 24 | addr[1] << 16 | addr[2] << 8 | addr[3]; sscanf( ip, "%lu.%lu.%lu.%lu", addr, addr + 1, addr + 2, addr + 3 ); unsigned long vl2 = addr[0] << 24 | addr[1] << 16 | addr[2] << 8 | addr[3]; vl1 = ( vl1 >> ( 32 - mask_bit ) ); vl2 = ( vl2 >> ( 32 - mask_bit ) );if( vl1 == vl2 ) return 1; }else{if(strcmp(ip,ip_mask) == 0) return 1; } token = strtok_r(NULL, ",", &saveptr); } return 0; }static int check_ip_in_redis(const char *redis_host, const char *ip,const char *rq_pro) {char buf[128];int loops = 0; strcpy(buf, redis_host); do{ loops ++;char *ptmp = NULL;char *host = strtok_r(buf, ":", &ptmp);if(!host) return -1;char *s_port = strtok_r(NULL, ":", &ptmp);if(!s_port) return -1;int port = atoi(s_port);char respone[40] = {0};int sock_fd = -1;if((sock_fd = sock_connect_nore(host, port, 5))<0)return -1;if(sock_write_loop(sock_fd, rq_pro, strlen(rq_pro)) != strlen(rq_pro)) { close(sock_fd);return -1; }if(sock_read_tmo(sock_fd, respone, sizeof(respone)-1, 5)<=0) { close(sock_fd);return -1; } if(strncmp(":0", respone, 2) == 0) { close(sock_fd);return 0; } else if(strncmp(":1", respone, 2) == 0) { close(sock_fd);return 1; } else if(strncmp("$", respone, 1) == 0) { int data_size = 0; int ret = 0;char *data_line = strstr(respone,"rn");if(!data_line) { close(sock_fd);return -1; } data_line = data_line+2; data_size = atoi(respone+1);if(data_size == -1) { close(sock_fd);return 0; }if(strlen(data_line) == data_size+2) { printf("line = %d, data_line = %sn",__LINE__,data_line); ret=check_ip_in_list(ip, data_line); close(sock_fd);return ret; }char *data = calloc(data_size+3,1);if(!data) { close(sock_fd);return -1; } strcpy(data,data_line);int read_size = strlen(data);int left_size = data_size + 2 - read_size;while(left_size > 0) {int nread = sock_read_tmo(sock_fd, data+read_size, left_size, 5);if(nread<=0) {free(data); close(sock_fd); return -1; } read_size += nread; left_size -= nread; } close(sock_fd); printf("line = %d, data = %sn",__LINE__,data); ret=check_ip_in_list(ip, data);free(data);return ret; } else if(strncmp("-MOVED", respone, 6) == 0) { close(sock_fd);char *p = strchr(respone, ' ');if(p == NULL)return -1; p = strchr(p+1, ' ');if(p == NULL)return -1; strcpy(buf, p+1); }else{ close(sock_fd);return -1; } }while(loops < 2);return -1; }int main(int argc,char *argv[]) {if(argc != 2) { printf("please input ipn");return -1; } const char *redis_ip = "127.0.0.1:7002";const char *domain = "test.com";char exist_pro[128] = {0};char get_pro[128] = {0}; snprintf(exist_pro,sizeof(exist_pro),"EXISTS test|%s|%srn",domain,"127.0.0.1"); snprintf(get_pro,sizeof(get_pro),"GET test_%srn",domain);int loops = 0;int ret = 0;do{ loops ++; ret = check_ip_in_redis(redis_ip, argv[1],exist_pro);if(ret == 0) ret = check_ip_in_redis(redis_ip, argv[1],get_pro); }while(loops < 3 && ret < 0); printf("line = %d, ret = %dn",__LINE__,ret);return ret; }
c_redis_cli.c
主要看這個(gè)check_ip_in_redis函數(shù)就行了,其它都是一些socket的封裝。
#!/usr/bin/pythonimport sys import socketdef main(argv):if(len(argv) != 3):print "please input domain ip!"returnhost = "192.168.188.47" port = 7002while 1: s = socket.socket() s.connect((host, port)) cmd = 'set %s_white_ip %srn' % (argv[1],argv[2]) s.send(cmd) res = s.recv(32) s.close() if res[0] == "+":print "set domain white ip suc!"return elif res[0:6] == "-MOVED": list = res.split(" ") ip_list = list[2].split(":") host = ip_list[0] port = int(ip_list[1]) else:print "set domain white ip error!"return if __name__ == "__main__": main(sys.argv)
讀到這里,這篇“怎么用python快速搭建redis集群”文章已經(jīng)介紹完畢,想要掌握這篇文章的知識(shí)點(diǎn)還需要大家自己動(dòng)手實(shí)踐使用過才能領(lǐng)會(huì),如果想了解更多相關(guān)內(nèi)容的文章,歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。