星尘

任汝千圣现,我有天真佛

  • 首页
  • 归档
  • 分类
  • 标签
  • 瞎扯
  • 关于
  • 搜索

Go 学习资料

发表于 2020-12-22 | 分类于 Go

Go 语言学习资料汇总

名称 网址 属性 特点
Go 编程时光 https://golang.iswbm.com/ 基础入门
Go语言入门教程 http://c.biancheng.net/golang/ 基础入门 部分收费
易百教程 - Go 系列 https://www.yiibai.com/go/ 基础入门
TopGoer 教程 http://www.topgoer.com/ 网络、并发、Web
The Way to Go https://learnku.com/docs/the-way-to-go the way to go 中文
Go 语言简明教程 https://geektutu.com/post/quick-golang.html go,gin,rpc 等
gin 中文文档 https://github.com/skyhee/gin-doc-cn web 框架
beego 开发文档 https://www.kancloud.cn/hello123/beego/126086 web 框架
echo 中文文档 https://www.bookstack.cn/read/go-echo/README.md web 框架
Iris 中文文档 https://studyiris.com/doc/ web 框架
Buffalo 中文文档 https://learnku.com/docs/buffalo-doc-cn web 框架
Revel 中文文档 https://www.bookstack.cn/books/gorevel-manual-zh web 框架
跟煎鱼学Go https://eddycjy.gitbook.io/golang/ 进阶
Go语言圣经 https://books.studygolang.com/gopl-zh/ 进阶
mojotv 进阶系列 https://mojotv.cn/404#Golang 进阶
Go 语言高级编程 https://chai2010.gitbooks.io/advanced-go-programming-book/content/ 进阶
Go 命令教程 https://hyper0x.github.io/go_command_tutorial/#/
https://wiki.jikexueyuan.com/project/go-command-tutorial/0.0.html
进阶
Uber 编程规范 https://www.infoq.cn/article/G6c95VyU5telNXXCC9yO 进阶
CTOLib 码库 https://www.ctolib.com/go/categories/go-guide.html 查询相关资料
GoCN https://gocn.vip/ 技术社区
Go 语言中文网 https://studygolang.com/ 技术社区
Go 夜读 https://talkgo.org/ Go 源码
Go 语言原本 https://changkun.de/golang/ Go 源码
Go 语言设计与实现 https://draveness.me/golang/ Go 源码

DRF 文档

发表于 2020-12-10 | 分类于 Python

http://yanrs.me/drf

Flask restful源码分析

发表于 2020-12-10 | 分类于 Python 进阶

参考地址:https://note.youdao.com/ynoteshare1/index.html?id=4ef343068763a56a10a2ada59a019484&type=note

Flask restful 简单示例如下:

from flask import Flask
from flask_restful import Resource, Api

app = Flask(__name__)
api = Api(app)


class HelloWorld(Resource):
    def get(self):
        return {'hello': 'world'}


api.add_resource(HelloWorld, '/')

if __name__ == '__main__':
    app.run(debug=True)

使用 flask_restful 的 Api 封装 app 对象,并且自定义请求类,使用 add_resource 将路由和类进行链接,并请求对应的方法。在 Flask 中,每个路由对应的是一个方法,flask_restful 对应的是一个类,里面做了怎样的处理呢?add_resource 源码如下:

    def add_resource(self, resource, *urls, **kwargs):
        if self.app is not None:
            self._register_view(self.app, resource, *urls, **kwargs)
        else:
            self.resources.append((resource, urls, kwargs))

_register_view 源码如下:

    def _register_view(self, app, resource, *urls, **kwargs):
        # app 代表当前 app 对象,resource 是路由对应的类
        # 获取 endpoint,如果没有指定则使用路由对应的类的名称
        endpoint = kwargs.pop('endpoint', None) or resource.__name__.lower()
        # self.endpoints 的类型是 set,所有 endpoint 不能重复 
        self.endpoints.add(endpoint)
        # 获取要传递给 resource 类的 args 参数
        resource_class_args = kwargs.pop('resource_class_args', ())
        # 获取要传递给 resource 类的 kwargs 参数
        resource_class_kwargs = kwargs.pop('resource_class_kwargs', {})

        if endpoint in getattr(app, 'view_functions', {}):
            previous_view_class = app.view_functions[endpoint].__dict__['view_class']
            if previous_view_class != resource:
                raise ValueError('This endpoint (%s) is already set to the class %s.' % (endpoint, previous_view_class.__name__))
        # 获取 mediatypes 
        resource.mediatypes = self.mediatypes_method()  # Hacky
        resource.endpoint = endpoint
        # 这里很重要
        # 路由对应的类 resource 继承的是 Resource,Resource 继承自 MethodView 并且重写了 dispatch_request,MethodView 的元类是 MethodViewType 和 View。所以,resource.as_view 调用的是 View 中的 as_view 方法,as_view 方法的返回是一个函数,只不过函数中存在路由对应的类的属性。
        # self.output 的参数是 resource.as_view 的返回值。
        resource_func = self.output(resource.as_view(endpoint, *resource_class_args,
            **resource_class_kwargs))

        for decorator in self.decorators:
            resource_func = decorator(resource_func)

        for url in urls:
            # If this Api has a blueprint
            if self.blueprint:
                # And this Api has been setup
                if self.blueprint_setup:
                    # Set the rule to a string directly, as the blueprint is already
                    # set up.
                    self.blueprint_setup.add_url_rule(url, view_func=resource_func, **kwargs)
                    continue
                else:
                    # Set the rule to a function that expects the blueprint prefix
                    # to construct the final url.  Allows deferment of url finalization
                    # in the case that the associated Blueprint has not yet been
                    # registered to an application, so we can wait for the registration
                    # prefix
                    rule = partial(self._complete_url, url)
            else:
                # If we've got no Blueprint, just build a url with no prefix
                rule = self._complete_url(url, '')
            # 最后还是调用 flask 的 add_url_rule 方法将路由和函数进行关联映射
            app.add_url_rule(rule, view_func=resource_func, **kwargs)

as_view 源码如下:

    @classmethod
    def as_view(cls, name, *class_args, **class_kwargs):
        # cls 是当前路由所对应的 resource 类
        # name 是 endpoint
        # class_args 是要传递给 resource 类的 args 参数
        # class_kwargs 是要传递给 resource 类的 kwargs 参数
        """
        将类转换为可与路由系统一起使用的实际视图函数。内部会动态生成一个函数,该函数将在每个请求上实例化View类,并在其上调用dispatch_request方法。
        """
        def view(*args, **kwargs):
            self = view.view_class(*class_args, **class_kwargs)
            return self.dispatch_request(*args, **kwargs)

        if cls.decorators:
            view.__name__ = name
            view.__module__ = cls.__module__
            for decorator in cls.decorators:
                view = decorator(view)
        # view.view_class 就是当前 resource 
        view.view_class = cls
        view.__name__ = name
        # 将当前 resource 的 __doc__,__module__,methods 赋给函数的对应的属性
        view.__doc__ = cls.__doc__
        view.__module__ = cls.__module__
        view.methods = cls.methods
        view.provide_automatic_options = cls.provide_automatic_options
        # 返回一个函数,且这个函数里面调用了当前 resource 的 dispatch_request 方法
        return view

因为 HelloWorld 类是继承 Resource 类的,所以在 as_view 中调用的 dispatch_request 方法就会调用到 Resource 中被重写的 dispatch_request 方法

class HelloWorld(Resource):
    def get(self):
        return {'hello': 'world'}

Resource 中 dispatch_request 源码如下:

class Resource(MethodView):
    representations = None
    method_decorators = []

    def dispatch_request(self, *args, **kwargs):
        # 获取当前请求方法,并寻找类中对应的方法
        meth = getattr(self, request.method.lower(), None)
        if meth is None and request.method == 'HEAD':
            meth = getattr(self, 'get', None)
        assert meth is not None, 'Unimplemented method %r' % request.method
        # 获取方法的装饰器
        if isinstance(self.method_decorators, Mapping):
            decorators = self.method_decorators.get(request.method.lower(), [])
        else:
            decorators = self.method_decorators
        # 将所有装饰器装饰到方法上
        for decorator in decorators:
            meth = decorator(meth)
        # 执行请求所对应的方法
        resp = meth(*args, **kwargs)
        # 如果方法对应返回就是一个 Response 对象,那么就直接返回
        if isinstance(resp, ResponseBase):  # There may be a better way to test
            return resp
        # 否则构建返回
        representations = self.representations or OrderedDict()

        #noinspection PyUnresolvedReferences
        mediatype = request.accept_mimetypes.best_match(representations, default=None)
        if mediatype in representations:
            data, code, headers = unpack(resp)
            resp = representations[mediatype](data, code, headers)
            resp.headers['Content-Type'] = mediatype
            return resp

        return resp

具体的方法执行之后,就会回到外层的 self.output 函数,self.output 源码如下:

def output(self, resource):
    # 这里的 resource 是 resource.as_view 返回的函数
    @wraps(resource)
    def wrapper(*args, **kwargs):
        resp = resource(*args, **kwargs)
        if isinstance(resp, ResponseBase):  # There may be a better way to test
            return resp
        data, code, headers = unpack(resp)
        return self.make_response(data, code, headers=headers)
       # output 返回的也是一个函数,准确来说 output 是一个装饰器
    return wrapper

启动流程:

  1. add_resource 调用 _register_view
  2. _register_view 调用 as_view 方法,as_view 返回 view 一个函数。资源类和 view 函数进行绑定。并且 view 中要调用 对应的 dispatch_request 方法
  3. 使用 output 装饰器对上述返回的函数进行装饰,返回一个被装饰的函数
  4. 调用 Flask add_url_rule 方法将上述返回的被装饰函数和路径进行绑定
  5. 执行 Flask 启动流程

调用流程

  1. Flask 请求进行,找到路由对应的函数
  2. 执行 output 函数,调用 view 函数
  3. view 函数中调用对应的 dispatch_request 函数
  4. 执行 Resource 中重写的 dispatch_request 函数,将请求转换为调用类中对应的方法
  5. 执行请求对应的目标方法
  6. 回到 output 方法,调用 make_response 构建返回
  7. 执行 Flask 返回流程

Flask 源码分析

发表于 2020-12-10 | 分类于 Python 进阶

参考文章: http://www.zchengjoey.com/posts/flask%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90/

WSGI

在说到wsgi时我们先看一下面向http的python程序需要关心的内容:

  1. 请求:
    • 请求方法(method)
    • 请求地址(url)
    • 请求内容(body)
    • 请求头(header)
    • 请求环境(environ)
  2. 响应:
    • 响应码(status_code)
    • 响应数据(data)
    • 响应头(header)

wsgi要做的就是关于程序端和服务端的标准规范, 将服务程序接收到的请i去传递给python程序, 并将网络的数据流和python的结构体进行转换.
它规定了python程序必须是一个可调用对象(实现了call函数的方法或类), 接受两个参数environ(WSGI的环境信息)和start_response(开始响应请求的函数), 并返回可迭代的结果. 直接上代码来实现一个最简单的web程序返回hello world:

from werkzeug.serving import run_simple

class WebClass:
    def __init__(self):
        pass

    def __call__(self, environ, start_response):
        status = '200 OK'
        response_headers = [('Content-type', 'text/plain')]
        start_response(status, response_headers)
        yield str.encode("Hello World!\n")

if __name__ == "__main__":
    app = WebClass()
    run_simple("127.0.0.1", 5000, app)

WebClass正是实现了__call__方法的可调用对象, 接受environ和start_respone, 并在返回之前调用start_response, start_response接受两个必须的参数, status_code(http状态码)和response_header(响应头), yield hello world正是要求的可迭代结果, 现在这个类只是实现了最简单的功能, 路由注册, 模板渲染等都没有实现.这里用了werkzeug提供的run_simple, 其实我们创建flask应用, 跑起来的时候调用的也是这个函数,后面将会讲到。

项目运行

app.run(), 源码如下:

    def run(self, host=None, port=None, debug=None, load_dotenv=True, **options):
        # 是否是 cli 启动
        if os.environ.get("FLASK_RUN_FROM_CLI") == "true":
            from .debughelpers import explain_ignored_app_run

            explain_ignored_app_run()
            return

        if get_load_dotenv(load_dotenv):
            cli.load_dotenv()

            # if set, let env vars override previous values
            if "FLASK_ENV" in os.environ:
                self.env = get_env()
                self.debug = get_debug_flag()
            elif "FLASK_DEBUG" in os.environ:
                self.debug = get_debug_flag()

        # debug passed to method overrides all other sources
        if debug is not None:
            self.debug = bool(debug)

        _host = "127.0.0.1"
        _port = 5000
        server_name = self.config.get("SERVER_NAME")
        sn_host, sn_port = None, None

        if server_name:
            sn_host, _, sn_port = server_name.partition(":")

        host = host or sn_host or _host
        # pick the first value that's not None (0 is allowed)
        port = int(next((p for p in (port, sn_port) if p is not None), _port))

        options.setdefault("use_reloader", self.debug)
        options.setdefault("use_debugger", self.debug)
        # 默认是多线程启动的
        options.setdefault("threaded", True)
        # 显示 banner 信息
        cli.show_server_banner(self.env, self.debug, self.name, False)

        from werkzeug.serving import run_simple

        try:
            # 运行 run_simple
            run_simple(host, port, self, **options)
        finally:
            self._got_first_request = False

上面函数的功能很简单,处理了以下参数,其中最主要的还是调用 werkzeug 的 run_simple 函数, run_simple 源码如下:

def run_simple(
    hostname,
    port,
    application,
    use_reloader=False,
    use_debugger=False,
    use_evalex=True,
    extra_files=None,
    reloader_interval=1,
    reloader_type="auto",
    threaded=False,
    processes=1,
    request_handler=None,
    static_files=None,
    passthrough_errors=False,
    ssl_context=None,
):

    if not isinstance(port, int):
        raise TypeError("port must be an integer")
    if use_debugger:
        from .debug import DebuggedApplication

        application = DebuggedApplication(application, use_evalex)
    if static_files:
        from .middleware.shared_data import SharedDataMiddleware

        application = SharedDataMiddleware(application, static_files)
    # 启动后的显示信息,包含运行地址,端口等
    def log_startup(sock):
        display_hostname = hostname if hostname not in ("", "*") else "localhost"
        quit_msg = "(Press CTRL+C to quit)"
        if sock.family == af_unix:
            _log("info", " * Running on %s %s", display_hostname, quit_msg)
        else:
            if ":" in display_hostname:
                display_hostname = "[%s]" % display_hostname
            port = sock.getsockname()[1]
            _log(
                "info",
                " * Running on %s://%s:%d/ %s",
                "http" if ssl_context is None else "https",
                display_hostname,
                port,
                quit_msg,
            )

    def inner():
        try:
            fd = int(os.environ["WERKZEUG_SERVER_FD"])
        except (LookupError, ValueError):
            fd = None
        # 获取一个 wsgi 对象
        srv = make_server(
            hostname,
            port,
            application,
            threaded,
            processes,
            request_handler,
            passthrough_errors,
            ssl_context,
            fd=fd,
        )
        if fd is None:
            log_startup(srv.socket)
        # 调用这个 wsgi 对象的 serve_forever 方法,让其项目处于运行状态
        srv.serve_forever()

    if use_reloader:
        if not is_running_from_reloader():
            if port == 0 and not can_open_by_fd:
                raise ValueError(
                    "Cannot bind to a random port with enabled "
                    "reloader if the Python interpreter does "
                    "not support socket opening by fd."
                )
            address_family = select_address_family(hostname, port)
            server_address = get_sockaddr(hostname, port, address_family)
            s = socket.socket(address_family, socket.SOCK_STREAM)
            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            s.bind(server_address)
            if hasattr(s, "set_inheritable"):
                s.set_inheritable(True)
            if can_open_by_fd:
                os.environ["WERKZEUG_SERVER_FD"] = str(s.fileno())
                s.listen(LISTEN_QUEUE)
                log_startup(s)
            else:
                s.close()
                if address_family == af_unix:
                    _log("info", "Unlinking %s" % server_address)
                    os.unlink(server_address)
        from ._reloader import run_with_reloader

        run_with_reloader(inner, extra_files, reloader_interval, reloader_type)
    else:
        # 调用 inner 函数
        inner()

run_simple 中最后调用了其中的 inner 函数,inner 中的 make_server 函数返回一个 WSGIServer 对象,然后再调用 WSGIServer 对象的 serve_forever 方法,创建 wsgi 的服务, 然后运行,这样项目就运行起来了。

请求处理

前面说了wsgi规定应用程序必须实现__call__方法, 找到Flask对应的内容:

    def __call__(self, environ, start_response):
        return self.wsgi_app(environ, start_response)

wsgi_app 源码如下:

    def wsgi_app(self, environ, start_response):
        # 获取 RequestContext 对象
        ctx = self.request_context(environ)
        error = None
        try:
            try:
                ctx.push()
                # full_dispatch_request 中做了很多事情,包含请求钩子处理,请求处理等
                response = self.full_dispatch_request()
            except Exception as e:
                error = e
                response = self.handle_exception(e)
            except:
                error = sys.exc_info()[1]
                raise
            return response(environ, start_response)
        finally:
            if self.should_ignore_error(error):
                error = None
            ctx.auto_pop(error)

full_dispatch_request 源码如下:

def full_dispatch_request(self):
    self.try_trigger_before_first_request_functions()
    try:
        request_started.send(self)
        # 判断是否有 @before_first_request,@before_request 装饰的钩子函数,如果有调用
        rv = self.preprocess_request()
        if rv is None:
            rv = self.dispatch_request()
    except Exception as e:
        rv = self.handle_user_exception(e)
     # 将请求结果通过钩子函数处理(如果有 @after_request 装饰的函数,那么就处理,不管请求是否异常都要执行的 teardown_request 函数)
    return self.finalize_request(rv)

这段最核心的就是 dispatch_request , dispatch_request 就是我们注册的路由函数的执行结果, 在 dispatch_request 之前我们看到 preprocess_request , 它的作用是将钩子函数处理一下

  1. 第一次请求处理之前的钩子函数, 通过 before_first_request 定义的
  2. 每个请求处理之前的钩子函数, 通过 before_request 定义的

而在 dispat_request 之后还有 finalize_request 函数, 它的作用同样是将请求结果通过钩子函数处理一下:

  1. 每个请求正常处理之后的 hook 函数,通过 after_request 定义
  2. 不管请求是否异常都要执行的 teardown_request hook 函数 所以上面最重要的就是 dispatch_request 函数, 找到我们注册的路由函数, 并返回

preprocess_request 源码如下:

    def preprocess_request(self):
        # 获取当前请求对象的 blueprint
        bp = _request_ctx_stack.top.request.blueprint

        funcs = self.url_value_preprocessors.get(None, ())
        if bp is not None and bp in self.url_value_preprocessors:
            funcs = chain(funcs, self.url_value_preprocessors[bp])
        for func in funcs:
            func(request.endpoint, request.view_args)

        funcs = self.before_request_funcs.get(None, ())
        if bp is not None and bp in self.before_request_funcs:
            funcs = chain(funcs, self.before_request_funcs[bp])
        for func in funcs:
            rv = func()
            if rv is not None:
                return rv

finalize_request 源码如下:

def finalize_request(self, rv, from_error_handler=False):
    # 创建一个 response 对象
    response = self.make_response(rv)
    try:
        # 处理返回前的钩子函数
        response = self.process_response(response)
        request_finished.send(self, response=response)
    except Exception:
        if not from_error_handler:
            raise
        self.logger.exception(
            "Request finalizing failed with an error while handling an error"
        )
    return response

process_response 源码如下:

   def process_response(self, response):
        ctx = _request_ctx_stack.top
        bp = ctx.request.blueprint
        funcs = ctx._after_request_functions
        if bp is not None and bp in self.after_request_funcs:
            funcs = chain(funcs, reversed(self.after_request_funcs[bp]))
        if None in self.after_request_funcs:
            funcs = chain(funcs, reversed(self.after_request_funcs[None]))
        for handler in funcs:
            response = handler(response)
        if not self.session_interface.is_null_session(ctx.session):
            self.session_interface.save_session(self, ctx.session, response)
        return response

响应

make_response 源码如下:

def make_response(self, rv):
    status = headers = None
    # 如果返回是一个 tuple
    if isinstance(rv, tuple):
        len_rv = len(rv)

        # 返回的长度为 3,那么分别就是 response 内容,http status,http headers
        if len_rv == 3:
            rv, status, headers = rv
        # 如果返回长度只有两个
        elif len_rv == 2:
            # 且最后一个是 dict, tuple, list 格式的,那么返回是 response 内容和 headers
            if isinstance(rv[1], (Headers, dict, tuple, list)):
                rv, headers = rv
            else:
            # 否则返回内容就是 response 内容和 status
                rv, status = rv
        # 其他格式则不支持
        else:
            raise TypeError(
                "The view function did not return a valid response tuple."
                " The tuple must have the form (body, status, headers),"
                " (body, status), or (body, headers)."
            )

    # 返回响应内容则不支持
    if rv is None:
        raise TypeError(
            "The view function did not return a valid response. The"
            " function either returned None or ended without a return"
            " statement."
        )

    # 确保 response 是响应类的实例
    if not isinstance(rv, self.response_class):
        if isinstance(rv, (text_type, bytes, bytearray)):
            rv = self.response_class(rv, status=status, headers=headers)
            status = headers = None
        elif isinstance(rv, dict):
            rv = jsonify(rv)
        elif isinstance(rv, BaseResponse) or callable(rv):
            try:
                rv = self.response_class.force_type(rv, request.environ)
            except TypeError as e:
                new_error = TypeError(
                    "{e}\nThe view function did not return a valid"
                    " response. The return type must be a string, dict, tuple,"
                    " Response instance, or WSGI callable, but it was a"
                    " {rv.__class__.__name__}.".format(e=e, rv=rv)
                )
                reraise(TypeError, new_error, sys.exc_info()[2])
        else:
            raise TypeError(
                "The view function did not return a valid"
                " response. The return type must be a string, dict, tuple,"
                " Response instance, or WSGI callable, but it was a"
                " {rv.__class__.__name__}.".format(rv=rv)
            )

    if status is not None:
        if isinstance(status, (text_type, bytes, bytearray)):
            rv.status = status
        else:
            rv.status_code = status
    if headers:
        rv.headers.extend(headers)
    return rv

路由匹配

在 flask 中, 构建路由规则有两种方法, 这两种方法其实是一样的,都是调用 add_url_rule 来实现

  1. 通过`@app.route()`的装饰器, 上面例子用的就是这种方法
  2. 通过app.add_url_rule, 这个方法的签名为 add_url_rule(self, rule, endpoint=None, view_func=None, **options)
def route(self, rule, **options):
 def decorator(f):
  endpoint = options.pop("endpoint", None)
  self.add_url_rule(rule, endpoint, f, **options)
  return f
 return decorator

add_url_rule 源码如下:

@setupmethod
    def add_url_rule(
        self,
        rule,
        endpoint=None,
        view_func=None,
        provide_automatic_options=None,
        **options
    ):
        # 如果没有设置 endpoint,那么 endpoint 就是函数名称
        if endpoint is None:
            endpoint = _endpoint_from_view_func(view_func)
        options["endpoint"] = endpoint

        # 获取函数对应的方法
        methods = options.pop("methods", None)
        if methods is None:
            methods = getattr(view_func, "methods", None) or ("GET",)
        if isinstance(methods, string_types):
            raise TypeError(
                "Allowed methods have to be iterables of strings, "
                'for example: @app.route(..., methods=["POST"])'
            )
        methods = set(item.upper() for item in methods)

        # 获取 required_methods
        required_methods = set(getattr(view_func, "required_methods", ()))
        if provide_automatic_options is None:
            provide_automatic_options = getattr(
                view_func, "provide_automatic_options", None
            )
        if provide_automatic_options is None:
            if "OPTIONS" not in methods:
                provide_automatic_options = True
                required_methods.add("OPTIONS")
            else:
                provide_automatic_options = False

        # 全部 methods
        methods |= required_methods

        # 获取一个 rule,底层使用的是 werkzeug.routing 的 Rule 类对象
        rule = self.url_rule_class(rule, methods=methods, **options)
        rule.provide_automatic_options = provide_automatic_options

        # 将路由添加进入 view_functions 中,底层使用的是 werkzeug.routing 的 Map 类对象
        self.url_map.add(rule)
        if view_func is not None:
            old_func = self.view_functions.get(endpoint)
            if old_func is not None and old_func != view_func:
                raise AssertionError(
                    "View function mapping is overwriting an "
                    "existing endpoint function: %s" % endpoint
                )
            self.view_functions[endpoint] = view_func

可以发现这个函数主要做的就是更新 app 的 url_map 和 view_functions 这两个变量.查找定义, 发现 url_map 是 werkzeug.routing 的Map 类对象, rule 是 werkzeug.routing 的 Rule 类对象, view_functions 就是一个字典, 从上我们也可以知道每个视图函数的 endpoint 必须是不同的.也可以发现, flask 的核心路由逻辑其实实在 werkzeug 中实现的。

dispatch_request 源码如下,在这里面将路由和要处理的函数结合起来,实现执行视图函数。

def dispatch_request(self):
    # 获取当前请求
    req = _request_ctx_stack.top.request
    if req.routing_exception is not None:
        self.raise_routing_exception(req)
    # 获取当前请求的 rule
    rule = req.url_rule
    if (
        getattr(rule, "provide_automatic_options", False)
        and req.method == "OPTIONS"
    ):
        return self.make_default_options_response()
    # 通过 endpoint 匹配视图函数并且执行
    return self.view_functions[rule.endpoint](**req.view_args)

上下文

之前在上面我们已经讲到dispatch_request函数在找到view_function后, 只是将最基本的参数传给了view_function, 可是有时这对视图函数来说是远远不够的, 它有时还需要头部(header), body里的数据, 才能正确运行, 可能 最简单的方法就是将所有的这些信息封装成一个对象, 作为参数传给视图函数, 可是这样一来所有的视图函数都需要添加对应的参数, 即使并没有用到它.

flask 的做法是把这些信息作为上下文, 类似全局变量的东西, 在需要的时候, 用 from flask import request 获取, 比如经常用的request.json, request.args, 这里有一个很重要的点就是它们必须是动态的, 在多线程或多协程的情况下, 每个线程或协程获取的必须是自己独特的对象, 不能导入后结果获取的是其他请求的内容, 那就乱套了.

那么flask是如何实现不同的线程协程准确获得自己的上下文的呢, 我们先来看一下这两个上下文的定义:

application context 演化成出两个变量 current_app 和 g

request context 演化出 request 和 session

他们的实现正式依靠 Local Stack 和 Local Proxy 类, 正是这两个东西才让我们在并发程序中每个视图函数都会看到属于自己的上下文而不会混乱, 而这两个类能在多线程或多协程情况下实现隔离效果是考了另一个基础类 Local, 实现了类似threading.local的效果

def _lookup_req_object(name):
    top = _request_ctx_stack.top
    if top is None:
        raise RuntimeError(_request_ctx_err_msg)
    return getattr(top, name)

def _lookup_app_object(name):
    top = _app_ctx_stack.top
    if top is None:
        raise RuntimeError(_app_ctx_err_msg)
    return getattr(top, name)

def _find_app():
    top = _app_ctx_stack.top
    if top is None:
        raise RuntimeError(_app_ctx_err_msg)
    return top.app

_request_ctx_stack = LocalStack()
_app_ctx_stack = LocalStack()
current_app = LocalProxy(_find_app)
request = LocalProxy(partial(_lookup_req_object, "request"))
session = LocalProxy(partial(_lookup_req_object, "session"))
g = LocalProxy(partial(_lookup_app_object, "g"))

Local 代码如下:

__storage__ 是用于存储内容的地方,格式为 {“线程ID”: {“key”:”value”}},因为每个请求线程 ID 不一样,所以 Local 实现了隔离的效果。__ident_func__ 绑定的是 get_ident 函数,作用是获取当前的线程 ID。

class Local(object):
    # 设置只有这两个属性可以被外部访问
    __slots__ = ("__storage__", "__ident_func__")

    def __init__(self):
        # 设置 __storage__ 的初始化值
        object.__setattr__(self, "__storage__", {})
        # 设置 __ident_func__ 的值,get_ident 获得当前线程的 id
        object.__setattr__(self, "__ident_func__", get_ident)

    # 遍历 Local
    def __iter__(self):
        return iter(self.__storage__.items())

    def __call__(self, proxy):
        """Create a proxy for a name."""
        return LocalProxy(self, proxy)

    # 释放
    def __release_local__(self):
        self.__storage__.pop(self.__ident_func__(), None)
    # 获取
    # 下面三个方法实现了属性的访问,设置和删除
    # 内部都调用了get_ident方法, 获取当前的线程或协程id, 然后一次为键访问值
    # 这样外部只是看到访问实例的属性, 其实内部已经实现了线程或协程的切换
    def __getattr__(self, name):
        try:
            return self.__storage__[self.__ident_func__()][name]
        except KeyError:
            raise AttributeError(name)
    # 设置
    def __setattr__(self, name, value):
        ident = self.__ident_func__()
        storage = self.__storage__
        try:
            storage[ident][name] = value
        except KeyError:
            storage[ident] = {name: value}
    # 删除
    def __delattr__(self, name):
        try:
            del self.__storage__[self.__ident_func__()][name]
        except KeyError:
            raise AttributeError(name)

Local 是用来提供多线程或多协程的隔离属性访问的, 那么 Local Stack 就提供了隔离的栈访问, 它只要提供了 push, pop, top方法, 主要是栈的一些方法,在 LocalStack 的 push 方法中, 其实是对属性_local也就是 Local 的操作, 也就是先创建一个列表, self._local.storage[ident(当前线程或协程id)]['stack'] = [], 然后其实还是用 append 将 request 请求信息添加进去

class LocalStack(object):
    def __init__(self):
        self._local = Local()

    def __release_local__(self):
        self._local.__release_local__()

    def _get__ident_func__(self):
        return self._local.__ident_func__

    def _set__ident_func__(self, value):
        object.__setattr__(self._local, '__ident_func__', value)
    __ident_func__ = property(_get__ident_func__, _set__ident_func__)
    del _get__ident_func__, _set__ident_func__

    def __call__(self):
        def _lookup():
            rv = self.top
            if rv is None:
                raise RuntimeError('object unbound')
            return rv
        return LocalProxy(_lookup)

    # push, pop, top 实现了栈的操作
    def push(self, obj):
        rv = getattr(self._local, 'stack', None)
        if rv is None:
            self._local.stack = rv = []
        rv.append(obj)
        return rv

    def pop(self):
        stack = getattr(self._local, 'stack', None)
        if stack is None:
            return None
        elif len(stack) == 1:
            release_local(self._local)
            return stack[-1]
        else:
            return stack.pop()

    @property
    def top(self):
        try:
            return self._local.stack[-1]
        except (AttributeError, IndexError):
            return None

上述已经将 Local 和 LocalStack 讲的差不多了。request ,g 的实现,用的是 LocalProxy,LocalProxy 还重写了所有的魔术方法,具体实现都是代理对象,LocalProxy 简要代码如下:

@implements_bool
class LocalProxy(object):
    __slots__ = ('__local', '__dict__', '__name__', '__wrapped__')

    def __init__(self, local, name=None):
        object.__setattr__(self, '_LocalProxy__local', local)
        object.__setattr__(self, '__name__', name)
        if callable(local) and not hasattr(local, '__release_local__'):
            # "local" 是一个回调函数
            object.__setattr__(self, '__wrapped__', local)
    # 获取当前线程或协程对应的对象
    def _get_current_object(self):
        if not hasattr(self.__local, '__release_local__'):
            return self.__local()
        try:
            return getattr(self.__local, self.__name__)
        except AttributeError:
            raise RuntimeError('no object bound to %s' % self.__name__)

_request_ctx_stack 代表的就是请求上下文,ctx 其实是 RequestContext,请求来的时候会调用 RequestContext 的 push 方法

def push(self):
        # app_ctx 存放的是 Flask 应用实例,如果不存在,且不是当前的应用实例,那么就创建一个,在进行 push 操作
        app_ctx = _app_ctx_stack.top
        if app_ctx is None or app_ctx.app != self.app:
            app_ctx = self.app.app_context()
            app_ctx.push()
            self._implicit_app_ctx_stack.append(app_ctx)
        else:
            self._implicit_app_ctx_stack.append(None)

        if hasattr(sys, "exc_clear"):
            sys.exc_clear()
        # push 当前 RequestContext 对象到 _request_ctx_stack 中
        _request_ctx_stack.push(self)
        if self.session is None:
            session_interface = self.app.session_interface
            self.session = session_interface.open_session(self.app, self.request)

            if self.session is None:
                self.session = session_interface.make_null_session(self.app)

        if self.url_adapter is not None:
            self.match_request()

    def pop(self, exc=_sentinel):
        app_ctx = self._implicit_app_ctx_stack.pop()

        try:
            clear_request = False
            if not self._implicit_app_ctx_stack:
                self.app.do_teardown_request(exc)

                request_close = getattr(self.request, 'close', None)
                if request_close is not None:
                    request_close()
                clear_request = True
        finally:
            rv = _request_ctx_stack.pop()
            if clear_request:
                rv.request.environ['werkzeug.request'] = None
            if app_ctx is not None:
                app_ctx.pop(exc)

    def auto_pop(self, exc):
        if self.request.environ.get('flask._preserve_context') or \
           (exc is not None and self.app.preserve_context_on_exception):
            self.preserved = True
            self._preserved_exc = exc
        else:
            self.pop(exc)

push 就是将该请求的 application context (如果 _app_ctx_stack 栈顶不是当前请求所在 app,需要重新创建 app context )和 request context 都保存到相关的栈上, pop 则相反, 做一些出栈清理操作。

现在上下文就比较清楚了,就是每次有请求过来,flask 会创建当前线程或协程需要处理的两个上下文,并压入隔离的栈。application context针对的是 flask实 例的, 因为 app 实例只有一个, 所以多个请求其实是公用一个 application context, 而 request context 是每次请求过来都要创建的,在请求结束时又出栈, 所以两个的生命周期时不同的, 也就是 application context 的周期就是实例的生命周期, 而request context 的生命周期取决于请求存在的时间。

flask: app.run() 时, 并没有看到哪里启动了多线程, 理论上在单线程的情况下, 只有一个请求处理完成之后才能处理下一个请求, 那么上面为什么能同时处理多个请求, 哪里创建了多线程呢?
在flask的启动函数(run)中, 有这么一个源码:

options.setdefault("threaded", True)

Python 开发及周边技术介绍

发表于 2020-11-29 | 分类于 Python 培训

发展历程

创始人

Python 的创始人为吉多·范罗苏姆,1989年的圣诞节期间,他为了在打发时间,决心开发一个新的脚本解释编程。之所以选中Python作为编程的名字,是因为他是BBC电视剧——蒙提·派森的飞行马戏团的爱好者。Python 发行时间为 1991 年, 即 29 年前。

2005年12月,吉多·范罗苏姆加入Google。2018年辞去了 Python 的 BDFL(Benevolent Dictator for Life)。2019年10月30日,Guido van Rossum 在推特公布了自己从 Dropbox 公司离职的消息,并表示已经退休。2020年11月13日,64岁的 Python 语言的创建者吉多·范罗苏姆发推表示,退休生活太无聊,决定入职微软。

使用概况

google 搜索趋势

以下为 2004 年至今的 google 中国的 “python” 关键字搜索趋势。可以看出从 2014 年开始,一直处于增长的趋势。

百度搜索趋势

以下为 2011年至今的 百度的 “python” 关键字搜索趋势。可以看出从 2017 年开始,一直处于增长的趋势。

学习路径

官网文档

官网地址: https://www.python.org/ 、官网文档:https://docs.python.org/3/

推荐文档

文档名称 文档地址 推荐指数(10分)
廖雪峰博客 廖雪峰博客 8
菜鸟教程 菜鸟教程 7
github.com/TwoWater/Python github.com/TwoWater/Python 8
知乎: 编程零基础应当如何开始学习 Python 知乎: 编程零基础应当如何开始学习 Python 8

更多文档可参考 Python 博客网站资源

推荐书籍

书籍名称 书籍地址 适用程度
Python编程 Python编程 入门
Python基础教程(第2版) Python基础教程(第2版) 入门
Effective Python Effective Python 进阶
Python Cookbook(第3版) Python Cookbook(第3版) 进阶

更多书籍可参考豆瓣书单

使用领域

问题1:生活中你接触到过 “人工智能” 的产品吗?RPA 涉及上述领域中的哪一类?

问题2:网络爬虫是好是坏?有相关法律约束吗?

问题3:你眼中的 Web 开发是什么样子的?

问题4:你知道 DevOps 吗?能在 github 上搜索出相关的 DevOps 项目吗?

人工智能

感知能力

指的是人类透过感官所收到环境的刺激,察觉消息的能力,简单的说就是人类五官的看、听、说、读、写等能力,学习人类的感知能力是AI目前主要的焦点之一,包括:

  • “看”:电脑视觉(Computer Vision)、图像识别(Image Recognition)、人脸识别(Face Recognition)、对象侦测(Object Detection)。
  • “听”:语音识别(Sound Recognition)。
  • “读”:自然语言处理(Natural Language Processing,NLP)、语音转换文本(Speech-to-Text)。
  • “写”:机器翻译(Machine Translation)。
  • “说”:语音生成(Sound Generation)、文本转换语音(Text-to-Speech)。

认知能力(Cognition)

指的是人类透过学习、判断、分析等等心理活动来了解消息、获取知识的过程与能力,对人类认知的模仿与学习也是目前AI第二个焦点领域,主要包括:

  • 分析识别能力:例如医学图像分析、产品推荐、垃圾邮件识别、法律案件分析、犯罪侦测、信用风险分析、消费行为分析等。
  • 预测能力:例如AI运行的预防性维修(Predictive Maintenance)、智能天然灾害预测与防治。
  • 判断能力:例如AI下围棋、自动驾驶车、健保诈欺判断、癌症判断等。
  • 学习能力:例如机器学习、深度学习、增强式学习等等各种学习方法。

网络爬虫

网络搜索引擎等站点通过爬虫软件更新自身的网站内容或其对其他网站的索引。网络爬虫可以将自己所访问的页面保存下来,以便搜索引擎事后生成索引&action=edit&redlink=1)供用户搜索。爬虫访问网站的过程会消耗目标系统资源。不少网络系统并不默许爬虫工作。因此在访问大量页面时,爬虫需要考虑到规划、负载,还需要讲“礼貌”。 不愿意被爬虫访问、被爬虫主人知晓的公开站点可以使用 robots.txt 文件之类的方法避免访问。这个文件可以要求机器人只对网站的一部分进行索引,或完全不作处理。

Web 开发

Web 开发主要是基于浏览器的应用程序开发,这也是 Web 应用程序开发的基础,比如淘宝、京东、当当网等。Web 开发在近年来,随着本身技术的突破以及移动设备的普及,基于 Web 领域的开发,也出现了明确的岗位职责分工,一个 Web 互联网产品中,基本上会分为 Web UI 设计、Web 前端开发以及 Web后端开发。对于大型的互联网公司,还会分独立的 Web 架构开发组,专门负责 Web 框架的维护更新与迭代。

运维开发

DevOps(Development和Operations的组合词)是一种重视“软件开发人员(Dev)”和“IT运维技术人员(Ops)”之间沟通合作的文化、运动或惯例。透过自动化“软件交付”和“架构变更”的流程,来使得构建、测试、发布软件能够更加地快捷、频繁和可靠。

DevOps的引入能对产品交付、测试、功能开发和维护(包括曾经罕见但如今已屡见不鲜的 “热补丁”)起到意义深远的影响。在缺乏DevOps能力的组织中,开发与运营之间存在着信息“鸿沟”,例如运营人员要求更好的可靠性和安全性,开发人员则希望基础设施响应更快,而业务用户的需求则是更快地将更多的特性发布给最终用户使用。这种信息鸿沟就是最常出问题的地方。

Web 开发

C/S 架构和 B/S 架构

CS 即 Client/Server(客户机/服务器)结构,C/S 结构在技能上非常成熟,它的重要特征就是交互性强、拥有安全的存取形式、网络通信数量低、响应速度快、利于处置大量数据。可是这个结构的程序就是针对性开发,变更不够灵活,维护与管理的难度较大。常常只局限在小型局域网,不利于扩展。而且,因为这个结构的每台客户机全部须要安装相对应的客户端程序,分布功能弱并且兼容性差,不可以完成迅速部署安装与配置,因为这样缺少通用性,拥有比较大的局限性。请求拥有肯定专业水准的技能人员去结束。
BS 即 Browser/Server(浏览器/服务器)结构,就是只安装维护一个服务器(Server),而客户端选用浏览器(Browse)运行软件。B/S 结构应用程序相对于传统的C/S结构应用程序就是一个特别大的进步。 B/S 结构的重要特征就是分布性强、维护方便、开发简单并且共享性强、总体拥有费用低。但数据安全性问题、对服务器需要过高、数据传输速度慢、软件的个性化特征明显减少,这些缺点就是有目共睹的,难以完成传统形式下的特殊功能请求。比如通过浏览器实行大量的数据输入或实行报表的应答、专用性打印输出全部相对比较困难与不便。另外,完成复杂的应用构造有较大的困难。

前后端分离

架构分离的本质: 所谓架构设计,实际上是如何合理的对现实的人力架构进行系统映射,以便最大限度的提高整个公司(组织)的运行效率

任何系统架构设计,实际上是对组织结构在系统上进行映射,前后端分离,就是在对前端开发人员和后端开发人员的工作进行解耦,尽量减少他她们之间的交流成本,帮助他她们更能专注于自己擅长的工作。

REST/GraphQL

REST 指的是一组架构约束条件和原则。满足这些约束条件和原则的应用程序或设计就是 RESTful。

Web 应用程序最重要的 REST 原则是,客户端和服务器之间的交互在请求之间是无状态的。从客户端到服务器的每个请求都必须包含理解请求所必需的信息。如果服务器在请求之间的任何时间点重启,客户端不会得到通知。此外,无状态请求可以由任何可用服务器回答,这十分适合云计算之类的环境。客户端可以缓存数据以改进性能。

GraphQL 对你的 API 中的数据提供了一套易于理解的完整描述,使得客户端能够准确地获得它需要的数据,而且没有任何冗余,也让 API 更容易地随着时间推移而演进,还能用于构建强大的开发者工具。

参考信息:解读GraphQL(一): WHAT & WHY ,GraphQL 官网

前端框架

前端技术日新月异,大致由 Angular、React、Vue 三分天下,Vue.js 是轻量级的开发框架,很适合开发小规模灵活的 Web 应用程序;而 Angular 尽管学习曲线较为陡峭,但却是构建完整复杂应用的好选择。

跨平台解决方案: 目前市场上已经出现了很多跨平台的应用,比如 uni-app,React Native 等。以此来实现一套代码多个平台可以运行。

参考地址:angular.io, 2018前端框架深度抉择, react 中文文档, Vue 中文文档,React Native, uni-app

Python Web 框架

Flask: Flask是一个使用Python编写的轻量级Web应用框架。基于Werkzeug WSGI工具箱和Jinja2 模板引擎。Flask使用BSD授权。2004年,一群来自世界各地的Python热衷者组成了Pocoo。Flask的作者是来自Pocoo的 Armin Ronacher。本来只是作者的一个愚人节玩笑,不过后来大受欢迎,进而成为一个正式的项目。

Django:Django是一个开放源代码的Web应用框架,由Python写成。采用了MVT的软件设计模式,即模型(Model),视图(View)和模板(Template)。它在开发初期用于管理劳伦斯出版集团旗下的一些以新闻为主的网站。

Docker/K8s/K3s

Docker容器与虚拟机类似,但二者在原理上不同。容器是将操作系统层虚拟化,虚拟机则是虚拟化硬件,因此容器更具有便携性、高效地利用服务器。 容器更多的用于表示 软件的一个标准化单元。由于容器的标准化,因此它可以无视基础设施(Infrastructure)的差异,部署到任何一个地方。另外,Docker也为容器提供更强的业界的隔离兼容。

Kubernetes(常简称为K8s)是用于自动部署、扩展和管理“容器化(containerized)应用程序”的开源系统。

k3s 是史上最轻量级Kubernetes。k3s 是 rancher 开源的一个 Kubernetes 发行版,从名字上就可以看出 k3s 相对 k8s 做了很多裁剪和优化,二进制程序不足 50MB,占用资源更少,只需要 512MB 内存即可运行。

微服务

微服务是一种以业务功能为主的服务设计概念,每一个服务都具有自主运行的业务功能,对外开放不受语言限制的 API (最常用的是 HTTP),应用程序则是由一个或多个微服务组成。

微服务的另一个对比是单体式应用程序。单体式应用表示一个应用程序内包含了所有需要的业务功能,并且使用像主从式架构 (Client/Server) 或是多层次架构 (N-tier) 实现,虽然它也是能以分布式应用程序来实现,但是在单体式应用内,每一个业务功能是不可分割的。若要对单体式应用进行扩展则必须将整个应用程序都放到新的运算资源(如:虚拟机) 内,但事实上应用程序中最吃资源、需要运算资源的仅有某个业务部分(例如跑分析报表或是数学算法分析),但因为单体式应用无法分割该部分,因此无形中会有大量的资源浪费的现象。

常见 Web 安全

  1. 购物秒杀你遇到过什么情况?不能支付?没有抢到?
  2. 为什么 12306 的验证码这么复杂,能不能去掉?

  3. 微博出现热点新闻为什么会宕机?

  4. CORS、XSS

Python Flask Web 开发

Flask 的理念是为所有应用建立一个良好的基础,其余的一切都取决于你自己或者扩展。一个最小的 Flask 应用如下:

from flask import Flask
app = Flask(__name__)

@app.route('/')
def hello_world():
    return 'Hello, World!'

参考:Flask 官方文档

参考资料来源

github: https://github.com/

angular.io:https://angular.io/

知乎:https://www.zhihu.com/

GraphQL 官网:https://graphql.cn/

维基百科:https://zh.wikipedia.org/

Vue 中文文档:https://cn.vuejs.org/

菜鸟教程:https://www.runoob.com/

uni-app:https://uniapp.dcloud.net.cn/

React Native:https://www.reactnative.cn/

react 中文文档:https://react.docschina.org/

Flask 官方文档:https://dormousehole.readthedocs.io/

解读GraphQL(一): WHAT & WHY:https://zhuanlan.zhihu.com/p/26522359

2018前端框架深度抉择:http://www.harleyzhuge.com/2018/11/09/2018%E5%89%8D%E7%AB%AF%E6%A1%86%E6%9E%B6%E6%B7%B1%E5%BA%A6%E6%8A%89%E6%8B%A9/

Python ToDoList 项目文档

发表于 2020-11-29 | 分类于 Python 培训

环境安装

Python 安装

下载地址:https://www.python.org/downloads/,下载版本为 3.6+ 即可。示例地址:https://www.python.org/downloads/release/python-386/。可在列表中根据自己使用机器进行选择性下载。

Pycharm 安装

下载地址:https://www.jetbrains.com/pycharm/download/#section=windows

Pycharm 激活

如果下载的是 Professional 版本的 Pycharm ,那么将 Pycharm 安装完成之后需要进行激活操作,激活方式可以分为账号激活,激活码激活,激活URL等方式,这里采用激活码的方式进行。文档地址:https://github.com/wenyanjun/free。为了方便,已将激活码复制到地址 https://paste.ubuntu.com/p/XSfKGFFFt3/ ,只需复制激活码到 Pycharm 中激活即可。

ToDoList

在线体验地址 http://www.todolist.cn/

需求分析

添加 ToDo

在添加 ToDo 输入框中,输入待办的 ToDo,那么该 ToDo 就会添加到正在进行的列表中

完成 ToDo

勾选 ”正在进行“ 中的待办 ToDo,那么就修改该 ToDo 的状态为已完成

修改 ToDo

可将 ”正在进行“ 中的待办 ToDo 的状态修改为 “已经完成”

可以修改 “正在进行” 中的待办的 ToDo 的内容。例如将 “今晚去游泳” 修改为 “今晚去聚餐”

已经完成的 ToDo 不能进行修改,只能被删除

删除 ToDo

可对正在进行或者已经完成的 ToDo 进行删除

删除所有 ToDo

支持一键删除所有的 ToDo,包含 “正在进行” 和 “已经完成” 两种状态的

获取所有 ToDo

需要提供查询接口,查询所有的 ToDo,或者单独查询 “正在进行” 的 ToDo,或者单独查询 “已经完成” 的 ToDo

模块拆分

在本项目中因为只涉及到 ToDo 的操作,所以可以将 ToDo 看作一个模块,只需开发该模块的增删改查接口即可。模块拆分的原则可参考系统架构设计模块拆分维度和原则, 模块拆分案例 - 5

问题:如果增加用户登录,注册,注销。评论等功能,那么该怎么拆分模块呢?

系统设计

项目结构设计

│─tolist
│  app.py  # 应用文件
│  database.py  # 数据库连接
│  models.py  # 数据库实体
│  test.db  # 数据库文件

数据库设计

create table todo
(
    id      INTEGER     not null
        primary key,
    name    VARCHAR(50) not null,
    status  SMALLINT    not null,
    deleted BOOLEAN     not null,
    check (deleted IN (0, 1))
);

ToDo 实体类设计

每个 ToDo 实体都有状态(正在进行、已经完成),名称,为了能唯一标识一个 ToDo,还可以为每个 ToDo 分配一个 ID,即实体类设计如下:

class ToDo(Base):
    __tablename__ = 'todo'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(50), default="", nullable=False)
    status = Column(SMALLINT, default=0, nullable=False)

需求开发

Flask 入门

打开 Pycharm,选择 new project 新建一个 Flask 项目

项目搭建

上述创建一个 Flask 项目后,即可生成一个最简单的 Flask 应用,其中 app.py 内容如下:

from flask import Flask

app = Flask(__name__)


@app.route('/')
def hello_world():
    return 'Hello World!'


if __name__ == '__main__':
    app.run()

新建 database.py 文件,内容如下:

"""
https://dormousehole.readthedocs.io/en/latest/patterns/sqlalchemy.html
"""

from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.ext.declarative import declarative_base

engine = create_engine('sqlite:///test.db', convert_unicode=True)
db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
Base = declarative_base()
Base.query = db_session.query_property()

新建 models.py 文件,内容如下:

from sqlalchemy import Column, Integer, String, SMALLINT
from database import Base, engine


class ToDo(Base):
    __tablename__ = 'todo'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(50), default="", nullable=False)
    status = Column(SMALLINT, default=0, nullable=False)

    def __init__(self, name=None, status=None):
        self.name = name
        self.status = status

    def __repr__(self):
        return '<ToDo %r>' % (self.name)


def init_db():
    Base.metadata.create_all(bind=engine)


if __name__ == '__main__':
    init_db()

接口开发

添加 ToDo

@app.route('/todo', methods=["POST"])
def create_todo():
    # 接收参数
    params = request.json
    # 判断参数是否存在
    if params:
        name = params.get("name")
        todo = ToDo(name=name, status=0)
        db_session.add(todo)
        db_session.commit()
        return {"status": 1, "message": "success"}
    else:
        return {"status": 1, "message": "params error"}, 401

完成 ToDo/修改 ToDo

@app.route('/todo/<string:todo_id>', methods=["PUT"])
def update_todo(todo_id):
    data = {}
    # 接收参数
    params = request.json
    # 判断参数是否存在
    if params:
        if params.get("status") is not None:
            data.update({"status": bool(params.get("status"))})
        if params.get("name") is not None:
            data.update({"name": params.get("name")})
        try:
            rows = db_session.query(ToDo).filter(ToDo.id == todo_id, ToDo.deleted != True).update(data)
            if not rows:
                return {"status": 0, "message": f"update error"}, 401
            db_session.commit()
        except Exception as e:
            return {"status": 0, "message": f"update error: {e}"}, 401
        return {"status": 1, "message": "success"}
    else:
        return {"status": 1, "message": "params error"}, 401

删除 ToDo/删除所有 ToDo

@app.route('/todo/', methods=["DELETE"])
@app.route('/todo/<string:todo_id>', methods=["DELETE"])
def delete_todo(todo_id=None):
    try:
        if todo_id:
            rows = db_session.query(ToDo).filter(ToDo.id == todo_id).update({"deleted": True})
        else:
            rows = db_session.query(ToDo).update({"deleted": True})
        if not rows:
            return {"status": 0, "message": "delete error"}, 401
        db_session.commit()
    except Exception as e:
        return {"status": 0, "message": f"delete error: {e}"}, 401
    return {"status": 1, "message": "success"}

获取所有 ToDo

@app.route('/todo/<string:todo_id>', methods=["PUT"])
def update_todo(todo_id):
    data = {}
    # 接收参数
    params = request.json
    # 判断参数是否存在
    if params:
        if params.get("status") is not None:
            data.update({"status": bool(params.get("status"))})
        if params.get("name") is not None:
            data.update({"name": params.get("name")})
        try:
            rows = db_session.query(ToDo).filter(ToDo.id == todo_id, ToDo.deleted != True).update(data)
            if not rows:
                return {"status": 0, "message": f"update error"}, 401
            db_session.commit()
        except Exception as e:
            return {"status": 0, "message": f"update error: {e}"}, 401
        return {"status": 1, "message": "success"}
    else:
        return {"status": 1, "message": "params error"}, 401

功能测试

使用 postman 进行功能性测试,下载地址 https://www.postman.com/, chrome 插件地址: https://chrome.google.com/webstore/detail/postman-interceptor/aicmkgpgakddgnaphhhpliifpcfhicfo?utm_source=chrome-ntp-icon

源码信息

可在此处下载上述源码:https://raw.githubusercontent.com/rexyan/warehouse/master/tolist.zip

参考资料来源

ToDoList: http://www.todolist.cn/

Python官网:https://www.python.org

模块拆分案例 - 5:https://zhuanlan.zhihu.com/p/172528871

系统架构设计模块拆分维度和原则:https://blog.csdn.net/varyall/article/details/81461002

###

11. Flume-1

发表于 2020-09-18 | 分类于 Java 大数据进阶 , Flume

概念

Source: 用户要根据自己的数据源的类型,选择合适的 source 对象
Sink: 用户需要根据自己的数据存储的目的地的类型,选择合适的 sink 对象。
Interceptors: 在 source 将 event 放入到 channel 之前,调用拦截器对 event 进行拦截和处理。
Channel Selectors: 当一个 source 对接多个 channel 时,由 Channel Selectors选取 channel 将 event 存入。
Sink Processor: 当多个 sink 从一个 channel 取数据时,为了保证数据的顺序,由 sink processor 从多个 sink 中挑选一个 sink,由这个 sink 干活。

安装 Flume

版本区别
0.9 之前称为 flume og
0.9 之后为 flume ng(目前都使用此版本)
1.7之前,没有taildirsource,1.7及之后有taildirsource

安装

  1. 解压
tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /opt/module/
  1. 添加到环境变量
JAVA_HOME=/opt/module/jdk1.8.0_121
HADOOP_HOME=/opt/module/hadoop-2.7.2
HIVE_HOME=/opt/module/apache-hive-1.2.1-bin
FLUME_HOME=/opt/module/apache-flume-1.7.0-bin

PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HIVE_HOME/bin:$FLUME_HOME/bin

export JAVA_HOME PATH HADOOP_HOME HIVE_HOME FLUME_HOME

启动agent

flume-ng agent -n agent的名称 -f agent配置文件 -c 其他配置文件所在的目录 -Dproperty=value

监控端口数据案例

通过 netcat 工具向本机的 4444 端口发送数据,flume 监控本机的 4444 端口,通过 flume 的 source 端读取数据,flume 将获取的数据通过 sink 端写出到控制台。

选择 Source。因为 source 是 netcat,所以这里选择 netcat source,查看 netcat source 参数信息

选择 Sink。使用 logger(日志输出器)将 event 输出到文件或者控制台,所以这里选择 sink 的时候就选择 logger sink

选择 Channel。这里使用 Memory Channel,特点是高吞吐量,但是可能会丢失些数据。

所以配置文件如下

# a1 是 agent 的名称,a1 中定义了一个叫 r1 的 source,如果有多个,使用空格间隔
# a1 中定义了一个叫k1的 sink
# a1 中定义了一个叫c1的 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 定义 source 的信息
# 因为 source r1 是 netcat,所以根据文档,需要配置 type bind port 三个参数。
a1.sources.r1.type=netcat
a1.sources.r1.bind=hadoop10
a1.sources.r1.port=44444

# 定义 sink 的信息
# 因为 sink k1 是 logger,所以根据文档,需要配置 type 参数,maxBytesToLog 是可选参数
a1.sinks.k1.type=logger
a1.sinks.k1.maxBytesToLog=100

# 定义 chanel 的信息
# 因为 chanel c1 是 memory,所以根据文档,需要配置 type 参数,capacity 是可选参数
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000

# 连接组件 同一个 source 可以对接多个 channel,一个 sink 只能从一个 channel 获取数据!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

在/opt/module/apache-flume-1.7.0-bin 目录下新建myagents目录,并创建名为 netcat-logger-memory.conf 的文件, 用于存储自定义的 agent 文件。内容就是上面的内容

这里需要注意,flume 的日志配置默认为文件中,所以需要修改 /opt/module/apache-flume-1.7.0-bin/conf/log4j.properties文件,将其中的 flume.root.logger 的值修改为 DEBUG,console

flume.root.logger=DEBUG,console
#flume.root.logger=INFO,LOGFILE

启动 agent

[[email protected] myagents]$ flume-ng agent -n a1 -c /opt/module/apache-flume-1.7.0-bin/conf/ -f /opt/module/apache-flume-1.7.0-bin/myagents/netcat-logger-memory.conf

在另一个终端里面安装 nc,并且启动 nc 发送消息

yum install -y nc  # 安装 nc
nc hadoop10 44444 # 启动 nc 并发送消息

实时读取本地文件到 HDFS 案例

Frp 内网穿透访问 Hadoop 集群

发表于 2020-09-18 | 分类于 奇淫技巧
Here's something encrypted, password is required to continue reading.
阅读全文 »
12…41

Rex

322 日志
35 分类
162 标签
RSS
Links
  • CSDN博客
  • rexyan.github.io
© 2017 — 2021 Rex
由 Hexo 强力驱动
|
主题 — NexT.Gemini v5.1.4
本站访客数 人次 本站总访问量 次