您好,登錄后才能下訂單哦!
這篇文章主要講解了flask怎么開啟多線程,內(nèi)容清晰明了,對(duì)此有興趣的小伙伴可以學(xué)習(xí)一下,相信大家閱讀完之后會(huì)有幫助。
在我之前解釋了flask如何支持多線程主要通過兩個(gè)類來實(shí)現(xiàn),LocalStack和Local,在Local中有兩個(gè)屬性,__storage__和__ident_func__,后者用來獲取線程id,從而區(qū)分不同線程發(fā)來的請(qǐng)求
這次要說的是flask如何開啟多線程
先從app.run()這個(gè)方法看起
def run(self, host=None, port=None, debug=None, **options): from werkzeug.serving import run_simple if host is None: host = '127.0.0.1' if port is None: server_name = self.config['SERVER_NAME'] if server_name and ':' in server_name: port = int(server_name.rsplit(':', 1)[1]) else: port = 5000 if debug is not None: self.debug = bool(debug) options.setdefault('use_reloader', self.debug) options.setdefault('use_debugger', self.debug) try: run_simple(host, port, self, **options) #會(huì)進(jìn)入這個(gè)函數(shù) finally: # reset the first request information if the development server # reset normally. This makes it possible to restart the server # without reloader and that stuff from an interactive shell. self._got_first_request = False
經(jīng)過判斷和設(shè)置后進(jìn)入run_simple()這個(gè)函數(shù),看下源碼
def run_simple(hostname, port, application, use_reloader=False,
use_debugger=False, use_evalex=True, extra_files=None, reloader_interval=1, reloader_type='auto', threaded=False, processes=1, request_handler=None, static_files=None, passthrough_errors=False, ssl_context=None): """Start a WSGI application. Optional features include a reloader, multithreading and fork support. This function has a command-line interface too:: python -m werkzeug.serving --help .. versionadded:: 0.5 `static_files` was added to simplify serving of static files as well as `passthrough_errors`. .. versionadded:: 0.6 support for SSL was added. .. versionadded:: 0.8 Added support for automatically loading a SSL context from certificate file and private key. .. versionadded:: 0.9 Added command-line interface. .. versionadded:: 0.10 Improved the reloader and added support for changing the backend through the `reloader_type` parameter. See :ref:`reloader` for more information. :param hostname: The host for the application. eg: ``'localhost'`` :param port: The port for the server. eg: ``8080`` :param application: the WSGI application to execute :param use_reloader: should the server automatically restart the python process if modules were changed? :param use_debugger: should the werkzeug debugging system be used? :param use_evalex: should the exception evaluation feature be enabled? :param extra_files: a list of files the reloader should watch additionally to the modules. For example configuration files. :param reloader_interval: the interval for the reloader in seconds. :param reloader_type: the type of reloader to use. The default is auto detection. Valid values are ``'stat'`` and ``'watchdog'``. See :ref:`reloader` for more information. :param threaded: should the process handle each request in a separate thread? :param processes: if greater than 1 then handle each request in a new process up to this maximum number of concurrent processes. :param request_handler: optional parameter that can be used to replace the default one. You can use this to replace it with a different :class:`~BaseHTTPServer.BaseHTTPRequestHandler` subclass. :param static_files: a list or dict of paths for static files. This works exactly like :class:`SharedDataMiddleware`, it's actually just wrapping the application in that middleware before serving. :param passthrough_errors: set this to `True` to disable the error catching. This means that the server will die on errors but it can be useful to hook debuggers in (pdb etc.) :param ssl_context: an SSL context for the connection. Either an :class:`ssl.SSLContext`, a tuple in the form ``(cert_file, pkey_file)``, the string ``'adhoc'`` if the server should automatically create one, or ``None`` to disable SSL (which is the default). """ if not isinstance(port, int): raise TypeError('port must be an integer') if use_debugger: from werkzeug.debug import DebuggedApplication application = DebuggedApplication(application, use_evalex) if static_files: from werkzeug.wsgi import SharedDataMiddleware application = SharedDataMiddleware(application, static_files) def log_startup(sock): display_hostname = hostname not in ('', '*') and hostname or 'localhost' if ':' in display_hostname: display_hostname = '[%s]' % display_hostname quit_msg = '(Press CTRL+C to quit)' port = sock.getsockname()[1] _log('info', ' * Running on %s://%s:%d/ %s', ssl_context is None and 'http' or 'https', display_hostname, port, quit_msg) def inner(): try: fd = int(os.environ['WERKZEUG_SERVER_FD']) except (LookupError, ValueError): fd = None srv = make_server(hostname, port, application, threaded, processes, request_handler, passthrough_errors, ssl_context, fd=fd) if fd is None: log_startup(srv.socket) srv.serve_forever() if use_reloader: # If we're not running already in the subprocess that is the # reloader we want to open up a socket early to make sure the # port is actually available. if os.environ.get('WERKZEUG_RUN_MAIN') != 'true': if port == 0 and not can_open_by_fd: raise ValueError('Cannot bind to a random port with enabled ' 'reloader if the Python interpreter does ' 'not support socket opening by fd.') # Create and destroy a socket so that any exceptions are # raised before we spawn a separate Python interpreter and # lose this ability. address_family = select_ip_version(hostname, port) s = socket.socket(address_family, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(get_sockaddr(hostname, port, address_family)) if hasattr(s, 'set_inheritable'): s.set_inheritable(True) # If we can open the socket by file descriptor, then we can just # reuse this one and our socket will survive the restarts. if can_open_by_fd: os.environ['WERKZEUG_SERVER_FD'] = str(s.fileno()) s.listen(LISTEN_QUEUE) log_startup(s) else: s.close() # Do not use relative imports, otherwise "python -m werkzeug.serving" # breaks. from werkzeug._reloader import run_with_reloader run_with_reloader(inner, extra_files, reloader_interval, reloader_type) else: inner() #默認(rèn)會(huì)執(zhí)行
經(jīng)過判斷和設(shè)置后進(jìn)入run_simple()這個(gè)函數(shù),看下源碼
def run_simple(hostname, port, application, use_reloader=False,
use_debugger=False, use_evalex=True, extra_files=None, reloader_interval=1, reloader_type='auto', threaded=False, processes=1, request_handler=None, static_files=None, passthrough_errors=False, ssl_context=None): """Start a WSGI application. Optional features include a reloader, multithreading and fork support. This function has a command-line interface too:: python -m werkzeug.serving --help .. versionadded:: 0.5 `static_files` was added to simplify serving of static files as well as `passthrough_errors`. .. versionadded:: 0.6 support for SSL was added. .. versionadded:: 0.8 Added support for automatically loading a SSL context from certificate file and private key. .. versionadded:: 0.9 Added command-line interface. .. versionadded:: 0.10 Improved the reloader and added support for changing the backend through the `reloader_type` parameter. See :ref:`reloader` for more information. :param hostname: The host for the application. eg: ``'localhost'`` :param port: The port for the server. eg: ``8080`` :param application: the WSGI application to execute :param use_reloader: should the server automatically restart the python process if modules were changed? :param use_debugger: should the werkzeug debugging system be used? :param use_evalex: should the exception evaluation feature be enabled? :param extra_files: a list of files the reloader should watch additionally to the modules. For example configuration files. :param reloader_interval: the interval for the reloader in seconds. :param reloader_type: the type of reloader to use. The default is auto detection. Valid values are ``'stat'`` and ``'watchdog'``. See :ref:`reloader` for more information. :param threaded: should the process handle each request in a separate thread? :param processes: if greater than 1 then handle each request in a new process up to this maximum number of concurrent processes. :param request_handler: optional parameter that can be used to replace the default one. You can use this to replace it with a different :class:`~BaseHTTPServer.BaseHTTPRequestHandler` subclass. :param static_files: a list or dict of paths for static files. This works exactly like :class:`SharedDataMiddleware`, it's actually just wrapping the application in that middleware before serving. :param passthrough_errors: set this to `True` to disable the error catching. This means that the server will die on errors but it can be useful to hook debuggers in (pdb etc.) :param ssl_context: an SSL context for the connection. Either an :class:`ssl.SSLContext`, a tuple in the form ``(cert_file, pkey_file)``, the string ``'adhoc'`` if the server should automatically create one, or ``None`` to disable SSL (which is the default). """ if not isinstance(port, int): raise TypeError('port must be an integer') if use_debugger: from werkzeug.debug import DebuggedApplication application = DebuggedApplication(application, use_evalex) if static_files: from werkzeug.wsgi import SharedDataMiddleware application = SharedDataMiddleware(application, static_files) def log_startup(sock): display_hostname = hostname not in ('', '*') and hostname or 'localhost' if ':' in display_hostname: display_hostname = '[%s]' % display_hostname quit_msg = '(Press CTRL+C to quit)' port = sock.getsockname()[1] _log('info', ' * Running on %s://%s:%d/ %s', ssl_context is None and 'http' or 'https', display_hostname, port, quit_msg) def inner(): try: fd = int(os.environ['WERKZEUG_SERVER_FD']) except (LookupError, ValueError): fd = None srv = make_server(hostname, port, application, threaded, processes, request_handler, passthrough_errors, ssl_context, fd=fd) if fd is None: log_startup(srv.socket) srv.serve_forever() if use_reloader: # If we're not running already in the subprocess that is the # reloader we want to open up a socket early to make sure the # port is actually available. if os.environ.get('WERKZEUG_RUN_MAIN') != 'true': if port == 0 and not can_open_by_fd: raise ValueError('Cannot bind to a random port with enabled ' 'reloader if the Python interpreter does ' 'not support socket opening by fd.') # Create and destroy a socket so that any exceptions are # raised before we spawn a separate Python interpreter and # lose this ability. address_family = select_ip_version(hostname, port) s = socket.socket(address_family, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(get_sockaddr(hostname, port, address_family)) if hasattr(s, 'set_inheritable'): s.set_inheritable(True) # If we can open the socket by file descriptor, then we can just # reuse this one and our socket will survive the restarts. if can_open_by_fd: os.environ['WERKZEUG_SERVER_FD'] = str(s.fileno()) s.listen(LISTEN_QUEUE) log_startup(s) else: s.close() # Do not use relative imports, otherwise "python -m werkzeug.serving" # breaks. from werkzeug._reloader import run_with_reloader run_with_reloader(inner, extra_files, reloader_interval, reloader_type) else: inner() #默認(rèn)會(huì)執(zhí)行
還是經(jīng)過一系列判斷后默認(rèn)會(huì)進(jìn)入inner()函數(shù),這個(gè)函數(shù)定義在run_simple()內(nèi),屬于閉包,inner()中會(huì)執(zhí)行make_server()這個(gè)函數(shù),看下源碼:
def make_server(host=None, port=None, app=None, threaded=False, processes=1,
request_handler=None, passthrough_errors=False, ssl_context=None, fd=None): """Create a new server instance that is either threaded, or forks or just processes one request after another. """ if threaded and processes > 1: raise ValueError("cannot have a multithreaded and " "multi process server.") elif threaded: return ThreadedWSGIServer(host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd) elif processes > 1: return ForkingWSGIServer(host, port, app, processes, request_handler, passthrough_errors, ssl_context, fd=fd) else: return BaseWSGIServer(host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd)
看到這也很明白了,想要配置多線程或者多進(jìn)程,則需要設(shè)置threaded或processes這兩個(gè)參數(shù),而這兩個(gè)參數(shù)是從app.run()中傳遞過來的:
app.run(**options) ---> run_simple(threaded,processes) ---> make_server(threaded,processes)
默認(rèn)情況下flask是單線程,單進(jìn)程的,想要開啟只需要在run中傳入對(duì)應(yīng)的參數(shù):app.run(threaded=True)即可.
從make_server中可知,flask提供了三種server:ThreadedWSGIServer,ForkingWSGIServer,BaseWSGIServer,默認(rèn)情況下是BaseWSGIServer
以線程為例,看下ThreadedWSGIServer這個(gè)類:
class ThreadedWSGIServer(ThreadingMixIn, BaseWSGIServer): #繼承自ThreadingMixIn, BaseWSGIServer
"""A WSGI server that does threading.""" multithread = True daemon_threads = True
ThreadingMixIn = socketserver.ThreadingMixIn
class ThreadingMixIn:
"""Mix-in class to handle each request in a new thread.""" # Decides how threads will act upon termination of the # main process daemon_threads = False def process_request_thread(self, request, client_address): """Same as in BaseServer but as a thread. In addition, exception handling is done here. """ try: self.finish_request(request, client_address) self.shutdown_request(request) except: self.handle_error(request, client_address) self.shutdown_request(request) def process_request(self, request, client_address): """Start a new thread to process the request.""" t = threading.Thread(target = self.process_request_thread, args = (request, client_address)) t.daemon = self.daemon_threads t.start()
process_request就是對(duì)每個(gè)請(qǐng)求產(chǎn)生一個(gè)新的線程來處理
最后寫一個(gè)非常簡(jiǎn)單的應(yīng)用來驗(yàn)證以上說法:
from flask import Flask
from flask import _request_ctx_stackapp = Flask(__name__)
@app.route('/')
def index():
print(_request_ctx_stack._local.__ident_func__()) while True: pass return '<h2>hello</h2>'
app.run() #如果需要開啟多線程則app.run(threaded=True)
_request_ctx_stack._local.__ident_func__()對(duì)應(yīng)這get_ident()這個(gè)函數(shù),返回當(dāng)前線程id,為什么要在后面加上while True這句呢,我們看下get_ident()這個(gè)函數(shù)的說明:
Return a non-zero integer that uniquely identifies the current thread amongst other threads that exist simultaneously. This may be used to identify per-thread resources. Even though on some platforms threads identities may appear to be allocated consecutive numbers starting at 1, this behavior should not be relied upon, and the number should be seen purely as a magic cookie. A thread's identity may be reused for another thread after it exits.
關(guān)鍵字我已經(jīng)加粗了,線程id會(huì)在線程結(jié)束后重復(fù)利用,所以我在路由函數(shù)中加了這個(gè)死循環(huán)來阻塞請(qǐng)求以便于觀察到不同的id,這就會(huì)產(chǎn)生兩種情況:
1.沒開啟多線程的情況下,一次請(qǐng)求過來,服務(wù)器直接阻塞,并且之后的其他請(qǐng)求也都阻塞
2.開啟多線程情況下,每次都會(huì)打印出不同的線程id
結(jié)果:
第一種情況
Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
139623180527360
第二種情況
Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
140315469436672
140315477829376
140315486222080
140315316901632
140315105163008
140315096770304
140315088377600
結(jié)果顯而易見
綜上所述:flask支持多線程,但默認(rèn)沒開啟,其次app.run()只適用于開發(fā)環(huán)境,生產(chǎn)環(huán)境下可以使用uWSGI,Gunicorn等web服務(wù)器
內(nèi)容擴(kuò)展:
flask開啟多線程還是多進(jìn)程
Flask 默認(rèn)是單進(jìn)程,單線程阻塞的任務(wù)模式,在項(xiàng)目上線的時(shí)候可以通過nginx+gunicorn 的方式部署flask任務(wù)。
但是在開發(fā)的過程中如果想通過延遲的方式測(cè)試高并發(fā)怎么實(shí)現(xiàn)呢,其實(shí)非常簡(jiǎn)單,
app.run()中可以接受兩個(gè)參數(shù),分別是threaded和processes,用于開啟線程支持和進(jìn)程支持。
1.threaded : 多線程支持,默認(rèn)為False,即不開啟多線程;
2.processes:進(jìn)程數(shù)量,默認(rèn)為1.
開啟方式:
if __name__ == '__main__': app.run(threaded=True) # app.run(processes=4)
注意:多進(jìn)程或多線程只能選擇一個(gè),不能同時(shí)開啟。
看完上述內(nèi)容,是不是對(duì)flask怎么開啟多線程有進(jìn)一步的了解,如果還想學(xué)習(xí)更多內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。