Skip to main content

日志食谱

Author:

Vinay Sajip <vinay_sajip at red-dove dot com>

此页面包含与日志记录相关的一些食谱,这些食谱在过去被发现有用。

在多个模块中使用日志记录

logging.getLogger('someLogger') 的多个调用返回对同一个记录器对象的引用。这不仅在同一个模块中,而且在模块之间,只要它在同一个Python解释器过程中。对同一对象的引用是真的;另外,应用程序代码可以在一个模块中定义和配置父记录器,并在单独的模块中创建(但不配置)子记录器,并且对该子进程的所有记录器调用都将传递给父进程。这里是一个主模块:

import logging
import auxiliary_module

# create logger with 'spam_application'
logger = logging.getLogger('spam_application')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# add the handlers to the logger
logger.addHandler(fh)
logger.addHandler(ch)

logger.info('creating an instance of auxiliary_module.Auxiliary')
a = auxiliary_module.Auxiliary()
logger.info('created an instance of auxiliary_module.Auxiliary')
logger.info('calling auxiliary_module.Auxiliary.do_something')
a.do_something()
logger.info('finished auxiliary_module.Auxiliary.do_something')
logger.info('calling auxiliary_module.some_function()')
auxiliary_module.some_function()
logger.info('done with auxiliary_module.some_function()')

这里是辅助模块:

import logging

# create logger
module_logger = logging.getLogger('spam_application.auxiliary')

class Auxiliary:
    def __init__(self):
        self.logger = logging.getLogger('spam_application.auxiliary.Auxiliary')
        self.logger.info('creating an instance of Auxiliary')

    def do_something(self):
        self.logger.info('doing something')
        a = 1 + 1
        self.logger.info('done doing something')

def some_function():
    module_logger.info('received a call to "some_function"')

输出如下所示:

2005-03-23 23:47:11,663 - spam_application - INFO -
   creating an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,665 - spam_application.auxiliary.Auxiliary - INFO -
   creating an instance of Auxiliary
2005-03-23 23:47:11,665 - spam_application - INFO -
   created an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,668 - spam_application - INFO -
   calling auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,668 - spam_application.auxiliary.Auxiliary - INFO -
   doing something
2005-03-23 23:47:11,669 - spam_application.auxiliary.Auxiliary - INFO -
   done doing something
2005-03-23 23:47:11,670 - spam_application - INFO -
   finished auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,671 - spam_application - INFO -
   calling auxiliary_module.some_function()
2005-03-23 23:47:11,672 - spam_application.auxiliary - INFO -
   received a call to 'some_function'
2005-03-23 23:47:11,673 - spam_application - INFO -
   done with auxiliary_module.some_function()

从多个线程记录

从多个线程进行日志记录不需要特殊的工作。以下示例显示从主(初始)线程和另一个线程进行日志记录:

import logging
import threading
import time

def worker(arg):
    while not arg['stop']:
        logging.debug('Hi from myfunc')
        time.sleep(0.5)

def main():
    logging.basicConfig(level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
    info = {'stop': False}
    thread = threading.Thread(target=worker, args=(info,))
    thread.start()
    while True:
        try:
            logging.debug('Hello from main')
            time.sleep(0.75)
        except KeyboardInterrupt:
            info['stop'] = True
            break
    thread.join()

if __name__ == '__main__':
    main()

运行时,脚本应该打印如下内容:

   0 Thread-1 Hi from myfunc
   3 MainThread Hello from main
 505 Thread-1 Hi from myfunc
 755 MainThread Hello from main
1007 Thread-1 Hi from myfunc
1507 MainThread Hello from main
1508 Thread-1 Hi from myfunc
2010 Thread-1 Hi from myfunc
2258 MainThread Hello from main
2512 Thread-1 Hi from myfunc
3009 MainThread Hello from main
3013 Thread-1 Hi from myfunc
3515 Thread-1 Hi from myfunc
3761 MainThread Hello from main
4017 Thread-1 Hi from myfunc
4513 MainThread Hello from main
4518 Thread-1 Hi from myfunc

这显示日志输出散布在一个可能的期望。这种方法适用于比这里显示的更多的线程。

多个处理程序和格式化程序

记录器是纯Python对象。 addHandler() 方法没有您可以添加的处理程序数量的最小或最大配额。有时,一个应用程序将所有严重性的所有消息记录到文本文件,同时记录错误或以上到控制台将是有益的。要设置它,只需配置适当的处理程序。应用程序代码中的日志调用将保持不变。这里稍微修改了以前的简单的基于模块的配置示例:

import logging

logger = logging.getLogger('simple_example')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
fh.setFormatter(formatter)
# add the handlers to logger
logger.addHandler(ch)
logger.addHandler(fh)

# 'application' code
logger.debug('debug message')
logger.info('info message')
logger.warn('warn message')
logger.error('error message')
logger.critical('critical message')

注意,’应用程序’代码不关心多个处理程序。所有改变的是添加和配置一个名为 fh 的新处理程序。

在编写和测试应用程序时,创建具有更高或更低严重性过滤器的新处理程序的能力可能非常有用。而不是使用许多 print 语句进行调试,使用 logger.debug:与print语句不同,稍后您将必须删除或注释掉,logger.debug语句可以在源代码中保持不变,并保持休眠状态,直到您再次需要它们为止。在那时,需要发生的唯一变化是修改记录器和/或处理器的严重性级别以进行调试。

记录到多个目的地

假设您要以不同的消息格式和在不同的情况下登录到控制台和文件。假设您想要将具有DEBUG及更高级别的消息记录到文件,将那些级别为INFO及更高级别的消息记录到控制台。让我们假设文件应该包含时间戳,但控制台消息不应该。这里是如何实现这一点:

import logging

# set up logging to file - see previous section for more details
logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
                    datefmt='%m-%d %H:%M',
                    filename='/temp/myapp.log',
                    filemode='w')
# define a Handler which writes INFO messages or higher to the sys.stderr
console = logging.StreamHandler()
console.setLevel(logging.INFO)
# set a format which is simpler for console use
formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
# tell the handler to use this format
console.setFormatter(formatter)
# add the handler to the root logger
logging.getLogger('').addHandler(console)

# Now, we can log to the root logger, or any other logger. First the root...
logging.info('Jackdaws love my big sphinx of quartz.')

# Now, define a couple of other loggers which might represent areas in your
# application:

logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')

logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')

当你运行这个,在控制台上你会看到

root        : INFO     Jackdaws love my big sphinx of quartz.
myapp.area1 : INFO     How quickly daft jumping zebras vex.
myapp.area2 : WARNING  Jail zesty vixen who grabbed pay from quack.
myapp.area2 : ERROR    The five boxing wizards jump quickly.

并在文件中你会看到类似的

10-22 22:19 root         INFO     Jackdaws love my big sphinx of quartz.
10-22 22:19 myapp.area1  DEBUG    Quick zephyrs blow, vexing daft Jim.
10-22 22:19 myapp.area1  INFO     How quickly daft jumping zebras vex.
10-22 22:19 myapp.area2  WARNING  Jail zesty vixen who grabbed pay from quack.
10-22 22:19 myapp.area2  ERROR    The five boxing wizards jump quickly.

如您所见,DEBUG消息只显示在文件中。其他消息将发送到两个目标。

此示例使用控制台和文件处理程序,但您可以使用任何数量和组合的处理程序您选择。

配置服务器示例

下面是使用日志配置服务器的模块示例:

import logging
import logging.config
import time
import os

# read initial config file
logging.config.fileConfig('logging.conf')

# create and start listener on port 9999
t = logging.config.listen(9999)
t.start()

logger = logging.getLogger('simpleExample')

try:
    # loop through logging calls to see the difference
    # new configurations make, until Ctrl+C is pressed
    while True:
        logger.debug('debug message')
        logger.info('info message')
        logger.warn('warn message')
        logger.error('error message')
        logger.critical('critical message')
        time.sleep(5)
except KeyboardInterrupt:
    # cleanup
    logging.config.stopListening()
    t.join()

这里是一个脚本,它需要一个文件名,并将该文件发送到服务器,正确的二进制编码长度,作为新的日志配置:

#!/usr/bin/env python
import socket, sys, struct

with open(sys.argv[1], 'rb') as f:
    data_to_send = f.read()

HOST = 'localhost'
PORT = 9999
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print('connecting...')
s.connect((HOST, PORT))
print('sending config...')
s.send(struct.pack('>L', len(data_to_send)))
s.send(data_to_send)
s.close()
print('complete')

处理阻塞的处理程序

有时你必须让你的日志处理程序做他们的工作,而不阻塞你正在记录的线程。这在Web应用程序中很常见,但当然也会发生在其他情况下。

显示缓慢行为的常见罪魁祸首是 SMTPHandler:发送电子邮件可能需要很长时间,由于开发人员控制之外的一些原因(例如,性能不佳的邮件或网络基础设施)。但是几乎任何基于网络的处理程序都可以阻止:即使是一个 SocketHandler 操作也可以在引擎下做一个太慢的DNS查询(这个查询可以在套接字库代码中深入,在Python层下面,在你的控制之外)。

一个解决方案是使用两部分方法。对于第一部分,仅将 QueueHandler 附加到从性能关键线程访问的那些记录器。他们只是写入他们的队列,可以将大小调整到足够大的容量或初始化,没有上限的大小。对队列的写入通常会被快速接受,尽管您可能需要捕获 queue.Full 异常作为代码中的预防措施。如果你是一个在他们的代码中有性能关键线程的图书馆开发人员,一定要记录这一点(以及建议只将 QueueHandlers 附加到你的日志记录器),以利于其他使用你的代码的开发人员。

解决方案的第二部分是 QueueListener,它已经被设计为 QueueHandler 的对应物。 QueueListener 非常简单:它通过一个队列和一些处理程序,它触发一个内部线程,监听从 QueueHandlers (或任何其他源的 LogRecords,发送)的LogRecords的队列。 LogRecords 从队列中删除并传递给处理程序进行处理。

拥有单独的 QueueListener 类的优点是,您可以使用相同的实例来服务多个 QueueHandlers。这比例如具有现有处理程序类的线程版本更有资源友好性,这将使每个处理程序吃一个线程而没有特别的好处。

使用这两个类的示例如下(导入省略):

que = queue.Queue(-1)  # no limit on size
queue_handler = QueueHandler(que)
handler = logging.StreamHandler()
listener = QueueListener(que, handler)
root = logging.getLogger()
root.addHandler(queue_handler)
formatter = logging.Formatter('%(threadName)s: %(message)s')
handler.setFormatter(formatter)
listener.start()
# The log output will display the thread which generated
# the event (the main thread) rather than the internal
# thread which monitors the internal queue. This is what
# you want to happen.
root.warning('Look out!')
listener.stop()

当运行时,将产生:

MainThread: Look out!

在 3.5 版更改: 在Python 3.5之前,QueueListener 总是将从队列接收的每条消息传递给它初始化的每个处理程序。 (这是因为假定级别过滤全部在另一侧完成,其中队列被填充。)从3.5开始,可以通过将关键字参数 respect_handler_level=True 传递给监听器的构造函数来改变这种行为。当这样做时,监听器将每个消息的级别与处理程序的级别进行比较,并且只有在适当的情况下才将消息传递给处理程序。

通过网络发送和接收日志记录事件

假设您要在网络上发送日志记录事件,并在接收端处理它们。一个简单的方法是将 SocketHandler 实例附加到发送端的根记录器:

import logging, logging.handlers

rootLogger = logging.getLogger('')
rootLogger.setLevel(logging.DEBUG)
socketHandler = logging.handlers.SocketHandler('localhost',
                    logging.handlers.DEFAULT_TCP_LOGGING_PORT)
# don't bother with a formatter, since a socket handler sends the event as
# an unformatted pickle
rootLogger.addHandler(socketHandler)

# Now, we can log to the root logger, or any other logger. First the root...
logging.info('Jackdaws love my big sphinx of quartz.')

# Now, define a couple of other loggers which might represent areas in your
# application:

logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')

logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')

在接收端,您可以使用 socketserver 模块设置接收器。这里是一个基本的工作示例:

import pickle
import logging
import logging.handlers
import socketserver
import struct


class LogRecordStreamHandler(socketserver.StreamRequestHandler):
    """Handler for a streaming logging request.

    This basically logs the record using whatever logging policy is
    configured locally.
    """

    def handle(self):
        """
        Handle multiple requests - each expected to be a 4-byte length,
        followed by the LogRecord in pickle format. Logs the record
        according to whatever policy is configured locally.
        """
        while True:
            chunk = self.connection.recv(4)
            if len(chunk) < 4:
                break
            slen = struct.unpack('>L', chunk)[0]
            chunk = self.connection.recv(slen)
            while len(chunk) < slen:
                chunk = chunk + self.connection.recv(slen - len(chunk))
            obj = self.unPickle(chunk)
            record = logging.makeLogRecord(obj)
            self.handleLogRecord(record)

    def unPickle(self, data):
        return pickle.loads(data)

    def handleLogRecord(self, record):
        # if a name is specified, we use the named logger rather than the one
        # implied by the record.
        if self.server.logname is not None:
            name = self.server.logname
        else:
            name = record.name
        logger = logging.getLogger(name)
        # N.B. EVERY record gets logged. This is because Logger.handle
        # is normally called AFTER logger-level filtering. If you want
        # to do filtering, do it at the client end to save wasting
        # cycles and network bandwidth!
        logger.handle(record)

class LogRecordSocketReceiver(socketserver.ThreadingTCPServer):
    """
    Simple TCP socket-based logging receiver suitable for testing.
    """

    allow_reuse_address = True

    def __init__(self, host='localhost',
                 port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
                 handler=LogRecordStreamHandler):
        socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
        self.abort = 0
        self.timeout = 1
        self.logname = None

    def serve_until_stopped(self):
        import select
        abort = 0
        while not abort:
            rd, wr, ex = select.select([self.socket.fileno()],
                                       [], [],
                                       self.timeout)
            if rd:
                self.handle_request()
            abort = self.abort

def main():
    logging.basicConfig(
        format='%(relativeCreated)5d %(name)-15s %(levelname)-8s %(message)s')
    tcpserver = LogRecordSocketReceiver()
    print('About to start TCP server...')
    tcpserver.serve_until_stopped()

if __name__ == '__main__':
    main()

先运行服务器,然后是客户端。在客户端,控制台上不打印任何内容;在服务器端,你应该看到类似的东西:

About to start TCP server...
   59 root            INFO     Jackdaws love my big sphinx of quartz.
   59 myapp.area1     DEBUG    Quick zephyrs blow, vexing daft Jim.
   69 myapp.area1     INFO     How quickly daft jumping zebras vex.
   69 myapp.area2     WARNING  Jail zesty vixen who grabbed pay from quack.
   69 myapp.area2     ERROR    The five boxing wizards jump quickly.

请注意,在某些情况下,pickle有一些安全问题。如果这些影响到您,您可以通过覆盖 makePickle() 方法并在其上实现您的替代方法,以及修改上述脚本以使用替代序列化来使用备用序列化方案。

向记录输出中添加上下文信息

有时,您希望日志输出除了包含传递给日志记录调用的参数之外,还包含上下文信息。例如,在联网的应用中,可能期望在日志中记录客户特定信息(例如,远程客户端的用户名或IP地址)。虽然您可以使用 extra 参数来实现这一点,但以这种方式传递信息并不总是方便。虽然可能很容易在每个连接的基础上创建 Logger 实例,但这不是一个好主意,因为这些实例不是垃圾收集。虽然这在实践中不是问题,但是当 Logger 实例的数量取决于要在记录应用程序时使用的粒度级别时,如果 Logger 实例的数量变得无效,则可能难以管理。

使用LoggerAdapters传递上下文信息

一种简单的方法是使用 LoggerAdapter 类来传递上下文信息,以便与日志记录事件信息一起输出。这个类被设计成看起来像一个 Logger,所以你可以调用 debug()info()warning()error()exception()critical()log()。这些方法具有与其在 Logger 中的对应方相同的签名,因此可以可交换地使用这两种类型的实例。

当创建 LoggerAdapter 的实例时,您传递一个 Logger 实例和一个包含您的上下文信息的类似dict的对象。当您调用 LoggerAdapter 实例上的一个日志记录方法时,它会将调用委派给传递给其构造函数的 Logger 的底层实例,并安排在委派的调用中传递上下文信息。这里是 LoggerAdapter 的代码片段:

def debug(self, msg, *args, **kwargs):
    """
    Delegate a debug call to the underlying logger, after adding
    contextual information from this adapter instance.
    """
    msg, kwargs = self.process(msg, kwargs)
    self.logger.debug(msg, *args, **kwargs)

LoggerAdapterprocess() 方法是将上下文信息添加到日志记录输出的地方。它传递了记录调用的消息和关键字参数,并且传回(潜在地)这些的修改版本以用于对底层记录器的调用。此方法的默认实现仅留下消息,但在关键字参数中插入一个“extra”键,该参数的值是传递给构造函数的dict类对象。当然,如果你在调用适配器时传递了一个’extra’关键字参数,它将被静默覆盖。

使用“extra”的优点是dict类对象中的值被合并到 LogRecord 实例的__dict__中,允许你使用你的 Formatter 实例使用定制的字符串,这些实例知道dict类对象的键。如果您需要其他方法,例如如果你想在消息字符串前面添加或附加上下文信息,你只需要继承 LoggerAdapter 并重写 process() 来做你需要的。这里是一个简单的例子:

class CustomAdapter(logging.LoggerAdapter):
    """
    This example adapter expects the passed in dict-like object to have a
    'connid' key, whose value in brackets is prepended to the log message.
    """
    def process(self, msg, kwargs):
        return '[%s] %s' % (self.extra['connid'], msg), kwargs

你可以这样使用:

logger = logging.getLogger(__name__)
adapter = CustomAdapter(logger, {'connid': some_conn_id})

然后,您登录到适配器的任何事件将在日志消息前添加 some_conn_id 的值。

使用除日志以外的对象传递上下文信息

你不需要传递一个实际的dict到 LoggerAdapter - 你可以传递一个实现 __getitem____iter__ 的类的实例,使它看起来像一个dict来记录。这将是有用的,如果你想动态生成值(而dict中的值将是不变的)。

使用过滤器来传递上下文信息

您还可以使用用户定义的 Filter 添加上下文信息以记录输出。允许 Filter 实例修改传递给它们的 LogRecords,包括添加附加属性,然后可以使用合适的格式字符串输出,或者如果需要,可以输出自定义 Formatter

例如,在web应用中,正被处理的请求(或至少其感兴趣的部分)可以存储在线程本地(threading.local)变量中,然后从 Filter 访问以添加来自请求的信息,例如,远程IP地址和远程用户的用户名 - 到 LogRecord,使用上面的 LoggerAdapter 示例中的属性名称’ip’和’user’。在这种情况下,可以使用相同的格式字符串来获得与上面所示相似的输出。这里是一个示例脚本:

import logging
from random import choice

class ContextFilter(logging.Filter):
    """
    This is a filter which injects contextual information into the log.

    Rather than use actual contextual information, we just use random
    data in this demo.
    """

    USERS = ['jim', 'fred', 'sheila']
    IPS = ['123.231.231.123', '127.0.0.1', '192.168.0.1']

    def filter(self, record):

        record.ip = choice(ContextFilter.IPS)
        record.user = choice(ContextFilter.USERS)
        return True

if __name__ == '__main__':
    levels = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL)
    logging.basicConfig(level=logging.DEBUG,
                        format='%(asctime)-15s %(name)-5s %(levelname)-8s IP: %(ip)-15s User: %(user)-8s %(message)s')
    a1 = logging.getLogger('a.b.c')
    a2 = logging.getLogger('d.e.f')

    f = ContextFilter()
    a1.addFilter(f)
    a2.addFilter(f)
    a1.debug('A debug message')
    a1.info('An info message with %s', 'some parameters')
    for x in range(10):
        lvl = choice(levels)
        lvlname = logging.getLevelName(lvl)
        a2.log(lvl, 'A message at %s level with %d %s', lvlname, 2, 'parameters')

当运行时,产生类似的东西:

2010-09-06 22:38:15,292 a.b.c DEBUG    IP: 123.231.231.123 User: fred     A debug message
2010-09-06 22:38:15,300 a.b.c INFO     IP: 192.168.0.1     User: sheila   An info message with some parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1       User: sheila   A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR    IP: 127.0.0.1       User: jim      A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG    IP: 127.0.0.1       User: sheila   A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR    IP: 123.231.231.123 User: fred     A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 192.168.0.1     User: jim      A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1       User: sheila   A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG    IP: 192.168.0.1     User: jim      A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f ERROR    IP: 127.0.0.1       User: sheila   A message at ERROR level with 2 parameters
2010-09-06 22:38:15,301 d.e.f DEBUG    IP: 123.231.231.123 User: fred     A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f INFO     IP: 123.231.231.123 User: fred     A message at INFO level with 2 parameters

从多个进程记录到单个文件

尽管日志记录是线程安全的,并且在单个进程中从多个线程记录到单个文件 is 支持,从 多个过程 记录到单个文件是 not 支持的,因为没有标准的方法来串行化跨多个进程的单个文件的访问在Python。如果需要从多个进程记录到单个文件,一种方法是让所有进程记录到 SocketHandler,并有一个单独的进程,实现一个套接字服务器从套接字读取并记录到文件。 (如果你愿意的话,你可以在一个现有进程中使用一个线程来执行这个功能。) 本节 更详细地记录这种方法,包括一个工作套接字接收器,可以作为你自己适应的起点应用程序。

如果您使用的是包含 multiprocessing 模块的最新版本的Python,您可以编写自己的处理程序,该处理程序使用该模块中的 Lock 类来顺序访问来自您的进程的文件。现有的 FileHandler 和子类目前不使用 multiprocessing,虽然他们可能在未来这样做。请注意,目前,multiprocessing 模块不在所有平台上提供工作锁定功能(请参阅 https://bugs.python.org/issue3770)。

或者,您可以使用 QueueQueueHandler 将所有日志记录事件发送到多进程应用程序中的其中一个进程。以下示例脚本演示了如何执行此操作;在示例中,单独的侦听器进程侦听由其他进程发送的事件,并根据其自己的日志记录配置记录它们。虽然该示例仅演示了一种方法(例如,您可能想使用侦听器线程,而不是一个单独的侦听器进程 - 实现将是类似的),它允许为侦听器和其他的完全不同的日志配置进程,并且可以用作满足您自己的特定需求的代码的基础:

# You'll need these imports in your own code
import logging
import logging.handlers
import multiprocessing

# Next two import lines for this demo only
from random import choice, random
import time

#
# Because you'll want to define the logging configurations for listener and workers, the
# listener and worker process functions take a configurer parameter which is a callable
# for configuring logging for that process. These functions are also passed the queue,
# which they use for communication.
#
# In practice, you can configure the listener however you want, but note that in this
# simple example, the listener does not apply level or filter logic to received records.
# In practice, you would probably want to do this logic in the worker processes, to avoid
# sending events which would be filtered out between processes.
#
# The size of the rotated files is made small so you can see the results easily.
def listener_configurer():
    root = logging.getLogger()
    h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 300, 10)
    f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
    h.setFormatter(f)
    root.addHandler(h)

# This is the listener process top-level loop: wait for logging events
# (LogRecords)on the queue and handle them, quit when you get a None for a
# LogRecord.
def listener_process(queue, configurer):
    configurer()
    while True:
        try:
            record = queue.get()
            if record is None:  # We send this as a sentinel to tell the listener to quit.
                break
            logger = logging.getLogger(record.name)
            logger.handle(record)  # No level or filter logic applied - just do it!
        except Exception:
            import sys, traceback
            print('Whoops! Problem:', file=sys.stderr)
            traceback.print_exc(file=sys.stderr)

# Arrays used for random selections in this demo

LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
          logging.ERROR, logging.CRITICAL]

LOGGERS = ['a.b.c', 'd.e.f']

MESSAGES = [
    'Random message #1',
    'Random message #2',
    'Random message #3',
]

# The worker configuration is done at the start of the worker process run.
# Note that on Windows you can't rely on fork semantics, so each process
# will run the logging configuration code when it starts.
def worker_configurer(queue):
    h = logging.handlers.QueueHandler(queue)  # Just the one handler needed
    root = logging.getLogger()
    root.addHandler(h)
    # send all messages, for demo; no other level or filter logic applied.
    root.setLevel(logging.DEBUG)

# This is the worker process top-level loop, which just logs ten events with
# random intervening delays before terminating.
# The print messages are just so you know it's doing something!
def worker_process(queue, configurer):
    configurer(queue)
    name = multiprocessing.current_process().name
    print('Worker started: %s' % name)
    for i in range(10):
        time.sleep(random())
        logger = logging.getLogger(choice(LOGGERS))
        level = choice(LEVELS)
        message = choice(MESSAGES)
        logger.log(level, message)
    print('Worker finished: %s' % name)

# Here's where the demo gets orchestrated. Create the queue, create and start
# the listener, create ten workers and start them, wait for them to finish,
# then send a None to the queue to tell the listener to finish.
def main():
    queue = multiprocessing.Queue(-1)
    listener = multiprocessing.Process(target=listener_process,
                                       args=(queue, listener_configurer))
    listener.start()
    workers = []
    for i in range(10):
        worker = multiprocessing.Process(target=worker_process,
                                         args=(queue, worker_configurer))
        workers.append(worker)
        worker.start()
    for w in workers:
        w.join()
    queue.put_nowait(None)
    listener.join()

if __name__ == '__main__':
    main()

上述脚本的变体在单独的线程中保持主进程中的日志记录:

import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue
import random
import threading
import time

def logger_thread(q):
    while True:
        record = q.get()
        if record is None:
            break
        logger = logging.getLogger(record.name)
        logger.handle(record)


def worker_process(q):
    qh = logging.handlers.QueueHandler(q)
    root = logging.getLogger()
    root.setLevel(logging.DEBUG)
    root.addHandler(qh)
    levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
              logging.CRITICAL]
    loggers = ['foo', 'foo.bar', 'foo.bar.baz',
               'spam', 'spam.ham', 'spam.ham.eggs']
    for i in range(100):
        lvl = random.choice(levels)
        logger = logging.getLogger(random.choice(loggers))
        logger.log(lvl, 'Message no. %d', i)

if __name__ == '__main__':
    q = Queue()
    d = {
        'version': 1,
        'formatters': {
            'detailed': {
                'class': 'logging.Formatter',
                'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            }
        },
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'level': 'INFO',
            },
            'file': {
                'class': 'logging.FileHandler',
                'filename': 'mplog.log',
                'mode': 'w',
                'formatter': 'detailed',
            },
            'foofile': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-foo.log',
                'mode': 'w',
                'formatter': 'detailed',
            },
            'errors': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-errors.log',
                'mode': 'w',
                'level': 'ERROR',
                'formatter': 'detailed',
            },
        },
        'loggers': {
            'foo': {
                'handlers': ['foofile']
            }
        },
        'root': {
            'level': 'DEBUG',
            'handlers': ['console', 'file', 'errors']
        },
    }
    workers = []
    for i in range(5):
        wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,))
        workers.append(wp)
        wp.start()
    logging.config.dictConfig(d)
    lp = threading.Thread(target=logger_thread, args=(q,))
    lp.start()
    # At this point, the main process could do some useful work of its own
    # Once it's done that, it can wait for the workers to terminate...
    for wp in workers:
        wp.join()
    # And now tell the logging thread to finish up, too
    q.put(None)
    lp.join()

此变体显示您可以应用特定记录器的配置 - 例如。 foo 记录器具有特殊处理器,其将 foo 子系统中的所有事件存储在文件 mplog-foo.log 中。这将由主进程中的日志记录机制使用(即使在工作进程中生成日志记录事件),以将消息定向到适当的目标。

使用文件旋转

有时你想让一个日志文件增长到一定的大小,然后打开一个新文件并登录。您可能希望保留一定数量的这些文件,并且当创建了许多文件时,旋转文件,以便文件数量和文件大小都保持有界。对于此使用模式,日志记录包提供了 RotatingFileHandler:

import glob
import logging
import logging.handlers

LOG_FILENAME = 'logging_rotatingfile_example.out'

# Set up a specific logger with our desired output level
my_logger = logging.getLogger('MyLogger')
my_logger.setLevel(logging.DEBUG)

# Add the log message handler to the logger
handler = logging.handlers.RotatingFileHandler(
              LOG_FILENAME, maxBytes=20, backupCount=5)

my_logger.addHandler(handler)

# Log some messages
for i in range(20):
    my_logger.debug('i = %d' % i)

# See what files are created
logfiles = glob.glob('%s*' % LOG_FILENAME)

for filename in logfiles:
    print(filename)

结果应该是6个单独的文件,每个文件都有应用程序的部分日志历史记录:

logging_rotatingfile_example.out
logging_rotatingfile_example.out.1
logging_rotatingfile_example.out.2
logging_rotatingfile_example.out.3
logging_rotatingfile_example.out.4
logging_rotatingfile_example.out.5

最新的文件总是 logging_rotatingfile_example.out,并且每次达到大小限制时,它将用后缀 .1 重命名。每个现有的备份文件被重命名以增加后缀(.1 变为 .2 等),并且 .6 文件被擦除。

显然,此示例将日志长度设置为太小作为极端示例。您需要将 maxBytes 设置为适当的值。

使用替代格式化样式

当日志记录被添加到Python标准库时,格式化具有可变内容的消息的唯一方法是使用%-formatting方法。从那时起,Python已经获得了两种新的格式化方法:string.Template (在Python 2.4中添加)和 str.format() (在Python 2.6中添加)。

日志记录(从3.2开始)为这两种额外的格式化风格提供了改进的支持。 Formatter 类已增强,以采用名为 style 的附加可选关键字参数。默认为 '%',但其他可能的值为 '{''$',它们对应于其他两种格式化样式。默认情况下保持向后兼容性(如您所期望的),但通过显式指定样式参数,您可以指定与 str.format()string.Template 一起使用的格式字符串。下面是一个示例控制台会话,以显示可能性:

>>> import logging
>>> root = logging.getLogger()
>>> root.setLevel(logging.DEBUG)
>>> handler = logging.StreamHandler()
>>> bf = logging.Formatter('{asctime} {name} {levelname:8s} {message}',
...                        style='{')
>>> handler.setFormatter(bf)
>>> root.addHandler(handler)
>>> logger = logging.getLogger('foo.bar')
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:11:55,341 foo.bar DEBUG    This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:12:11,526 foo.bar CRITICAL This is a CRITICAL message
>>> df = logging.Formatter('$asctime $name ${levelname} $message',
...                        style='$')
>>> handler.setFormatter(df)
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:13:06,924 foo.bar DEBUG This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:13:11,494 foo.bar CRITICAL This is a CRITICAL message
>>>

请注意,用于最终输出到日志的日志消息的格式化完全独立于如何构建单个日志消息。仍然可以使用%格式,如下所示:

>>> logger.error('This is an%s %s %s', 'other,', 'ERROR,', 'message')
2010-10-28 15:19:29,833 foo.bar ERROR This is another, ERROR, message
>>>

记录调用(logger.debug()logger.info() 等)只采用实际日志消息本身的位置参数,关键字参数仅用于确定如何处理实际日志调用的选项(例如,exc_info 关键字参数指示应记录回溯信息,或 extra 关键字参数以指示要添加到日志的附加上下文信息)。因此,您不能使用 str.format()string.Template 语法直接进行日志调用,因为内部日志记录包使用%-formatting来合并格式字符串和变量参数。在保持向下兼容性的情况下不会改变这一点,因为现有代码中的所有日志调用都将使用%-format字符串。

但是,有一种方法,您可以使用{} - 和$ - 格式化来构建您的单个日志消息。回想一下,对于消息,您可以使用任意对象作为消息格式字符串,并且日志包将在该对象上调用 str() 以获取实际的格式字符串。考虑下面两个类:

class BraceMessage:
    def __init__(self, fmt, *args, **kwargs):
        self.fmt = fmt
        self.args = args
        self.kwargs = kwargs

    def __str__(self):
        return self.fmt.format(*self.args, **self.kwargs)

class DollarMessage:
    def __init__(self, fmt, **kwargs):
        self.fmt = fmt
        self.kwargs = kwargs

    def __str__(self):
        from string import Template
        return Template(self.fmt).substitute(**self.kwargs)

可以使用其中任何一个替换格式字符串,以允许{}或$格式化用于构建出现在格式化日志输出中的实际“消息”部分,而不是“%(message)s”或“{message}”或“$message”。这是一个有点难以使用类名称,当你想记录的东西,但它是相当可口的,如果你使用别名,如 __ (双下划线—不要与 _ 混淆,单个下划线用作同义词/别名为 gettext.gettext() 或其兄弟)。

上面的类不包括在Python中,虽然它们很容易复制和粘贴到你自己的代码。它们可以如下使用(假设它们在一个名为 wherever 的模块中声明):

>>> from wherever import BraceMessage as __
>>> print(__('Message with {0} {name}', 2, name='placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})',
...       point=p))
Message with coordinates: (0.50, 0.50)
>>> from wherever import DollarMessage as __
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>

虽然上述示例使用 print() 来显示格式化的工作原理,但您当然可以使用 logger.debug() 或类似的方法使用此方法实际进行日志记录。

需要注意的一点是,使用这种方法不会产生显着的性能损失:实际的格式化不会发生在进行日志调用时,而是当(和if)记录的消息实际上要由处理程序输出到日志。所以唯一不寻常的事情可能会让你失望的是括号绕过格式字符串和参数,而不只是格式字符串。这是因为__符号只是用于对一个XXXMessage类的构造函数调用的语法糖。

如果你喜欢,你可以使用 LoggerAdapter 实现类似的效果,如下面的例子:

import logging

class Message(object):
    def __init__(self, fmt, args):
        self.fmt = fmt
        self.args = args

    def __str__(self):
        return self.fmt.format(*self.args)

class StyleAdapter(logging.LoggerAdapter):
    def __init__(self, logger, extra=None):
        super(StyleAdapter, self).__init__(logger, extra or {})

    def log(self, level, msg, *args, **kwargs):
        if self.isEnabledFor(level):
            msg, kwargs = self.process(msg, kwargs)
            self.logger._log(level, Message(msg, args), (), **kwargs)

logger = StyleAdapter(logging.getLogger(__name__))

def main():
    logger.debug('Hello, {}', 'world!')

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    main()

当使用Python 3.2或更高版本运行时,上述脚本应记录消息 Hello, world!

定制 LogRecord

每个日志记录事件由 LogRecord 实例表示。当事件被记录并且不被记录器的级别过滤时,创建 LogRecord,用关于事件的信息填充,然后传递给该记录器的处理程序(及其祖先,直到并包括记录器,其中进一步传播层次结构被禁用)。在Python 3.2之前,只有两个地方完成了这个创建:

  • Logger.makeRecord(),它在记录事件的正常过程中调用。这直接调用 LogRecord 来创建实例。

  • makeLogRecord(),它用包含要添加到LogRecord的属性的字典调用。这通常在已经通过网络接收到合适的字典时被调用(例如,经由 SocketHandler 以pickle形式或经由 HTTPHandler 以JSON形式)。

这通常意味着,如果你需要做一些特殊的 LogRecord,你不得不做以下之一。

第一种方法是在一些情况下(例如)几个不同的图书馆想做不同的事情有点笨重。每个人都会尝试设置自己的 Logger 子类,并且最后一个将获胜。

第二种方法在许多情况下运行得相当好,但不允许你。使用 LogRecord 的专门子类。库开发人员可以在他们的日志记录器上设置合适的过滤器,但是他们必须记住每次他们引入一个新的日志记录器时都这样做(他们只需要添加新的包或模块,

logger = logging.getLogger(__name__)

在模块级)。这可能是一个太多的事情要考虑。开发人员还可以将过滤器添加到附加到它们的顶层记录器的 NullHandler,但是如果应用程序开发者将处理程序附加到较低级别的库记录器,则不会调用此过滤器 - 因此该处理程序的输出不会反映意图的图书馆开发人员。

在Python 3.2和更高版本中,LogRecord 创建是通过工厂完成的,您可以指定它。工厂只是一个可以调用您可以设置与 setLogRecordFactory(),并询问与 getLogRecordFactory()。使用与 LogRecord 构造函数相同的签名调用工厂,因为 LogRecord 是工厂的默认设置。

这种方法允许自定义工厂控制LogRecord创建的所有方面。例如,您可以返回一个子类,或者只要在创建后向记录添加一些附加属性,使用类似于此的模式:

old_factory = logging.getLogRecordFactory()

def record_factory(*args, **kwargs):
    record = old_factory(*args, **kwargs)
    record.custom_attribute = 0xdecafbad
    return record

logging.setLogRecordFactory(record_factory)

此模式允许不同的库将工厂链接在一起,并且只要它们不会覆盖彼此的属性或无意地覆盖作为标准提供的属性,就不会有意外。但是,应该记住,链中的每个链接都会增加所有日志记录操作的运行时开销,并且只有在使用 Filter 不能提供所需结果时,才应该使用该技术。

子类化QueueHandler - 一个ZeroMQ示例

您可以使用 QueueHandler 子类将消息发送到其他类型的队列,例如ZeroMQ的“发布”套接字。在下面的示例中,套接字是单独创建的,并传递给处理程序(作为其“队列”):

import zmq   # using pyzmq, the Python binding for ZeroMQ
import json  # for serializing records portably

ctx = zmq.Context()
sock = zmq.Socket(ctx, zmq.PUB)  # or zmq.PUSH, or other suitable value
sock.bind('tcp://*:5556')        # or wherever

class ZeroMQSocketHandler(QueueHandler):
    def enqueue(self, record):
        data = json.dumps(record.__dict__)
        self.queue.send(data)

handler = ZeroMQSocketHandler(sock)

当然还有其他组织方式,例如传递处理程序所需的数据来创建套接字:

class ZeroMQSocketHandler(QueueHandler):
    def __init__(self, uri, socktype=zmq.PUB, ctx=None):
        self.ctx = ctx or zmq.Context()
        socket = zmq.Socket(self.ctx, socktype)
        socket.bind(uri)
        QueueHandler.__init__(self, socket)

    def enqueue(self, record):
        data = json.dumps(record.__dict__)
        self.queue.send(data)

    def close(self):
        self.queue.close()

子类化QueueListener - 一个ZeroMQ示例

您还可以将 QueueListener 子类化为从其他类型的队列获取消息,例如ZeroMQ的订阅套接字。这里有一个例子:

class ZeroMQSocketListener(QueueListener):
    def __init__(self, uri, *handlers, **kwargs):
        self.ctx = kwargs.get('ctx') or zmq.Context()
        socket = zmq.Socket(self.ctx, zmq.SUB)
        socket.setsockopt(zmq.SUBSCRIBE, '')  # subscribe to everything
        socket.connect(uri)

    def dequeue(self):
        msg = self.queue.recv()
        return logging.makeLogRecord(json.loads(msg))

参见

模块 logging

日志模块的API参考。

模块 logging.config

日志模块的配置API。

模块 logging.handlers

有用的处理程序包括在日志模块中。

一个基本的日志教程

更高级的日志教程

基于字典的示例配置

下面是一个日志配置字典的例子 - 它取自 关于Django项目的文档。此字典传递到 dictConfig() 以使配置生效:

LOGGING = {
    'version': 1,
    'disable_existing_loggers': True,
    'formatters': {
        'verbose': {
            'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s'
        },
        'simple': {
            'format': '%(levelname)s %(message)s'
        },
    },
    'filters': {
        'special': {
            '()': 'project.logging.SpecialFilter',
            'foo': 'bar',
        }
    },
    'handlers': {
        'null': {
            'level':'DEBUG',
            'class':'django.utils.log.NullHandler',
        },
        'console':{
            'level':'DEBUG',
            'class':'logging.StreamHandler',
            'formatter': 'simple'
        },
        'mail_admins': {
            'level': 'ERROR',
            'class': 'django.utils.log.AdminEmailHandler',
            'filters': ['special']
        }
    },
    'loggers': {
        'django': {
            'handlers':['null'],
            'propagate': True,
            'level':'INFO',
        },
        'django.request': {
            'handlers': ['mail_admins'],
            'level': 'ERROR',
            'propagate': False,
        },
        'myproject.custom': {
            'handlers': ['console', 'mail_admins'],
            'level': 'INFO',
            'filters': ['special']
        }
    }
}

有关此配置的更多信息,您可以查看Django文档的 相关部分

使用旋转器和命名器来自定义日志旋转处理

在以下代码段中给出了如何定义命名和旋转器的示例,其中显示了基于zlib的日志文件压缩:

def namer(name):
    return name + ".gz"

def rotator(source, dest):
    with open(source, "rb") as sf:
        data = sf.read()
        compressed = zlib.compress(data, 9)
        with open(dest, "wb") as df:
            df.write(compressed)
    os.remove(source)

rh = logging.handlers.RotatingFileHandler(...)
rh.rotator = rotator
rh.namer = namer

这些不是“真正的”.gz文件,因为它们是裸压缩数据,没有“容器”,如在一个实际的gzip文件中找到的。此代码段仅用于说明目的。

更复杂的多处理示例

以下工作示例显示如何使用日志记录与使用配置文件的多重处理。配置相当简单,但用于说明在实际多处理场景中如何实现更复杂的配置。

在该示例中,主进程生成侦听器进程和一些工作进程。每个主进程,监听器和工人都有三个单独的配置(工人都共享相同的配置)。我们可以在主进程中看到日志记录,工作者如何记录到QueueHandler以及监听器如何实现QueueListener和更复杂的日志配置,并安排将通过队列接收的事件分派到配置中指定的处理程序。请注意,这些配置纯粹是说明性的,但您应该能够将此示例适应您自己的方案。

这里是脚本 - docstrings和评论希望解释它是如何工作的:

import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue, Event, current_process
import os
import random
import time

class MyHandler:
    """
    A simple handler for logging events. It runs in the listener process and
    dispatches events to loggers based on the name in the received record,
    which then get dispatched, by the logging system, to the handlers
    configured for those loggers.
    """
    def handle(self, record):
        logger = logging.getLogger(record.name)
        # The process name is transformed just to show that it's the listener
        # doing the logging to files and console
        record.processName = '%s (for %s)' % (current_process().name, record.processName)
        logger.handle(record)

def listener_process(q, stop_event, config):
    """
    This could be done in the main process, but is just done in a separate
    process for illustrative purposes.

    This initialises logging according to the specified configuration,
    starts the listener and waits for the main process to signal completion
    via the event. The listener is then stopped, and the process exits.
    """
    logging.config.dictConfig(config)
    listener = logging.handlers.QueueListener(q, MyHandler())
    listener.start()
    if os.name == 'posix':
        # On POSIX, the setup logger will have been configured in the
        # parent process, but should have been disabled following the
        # dictConfig call.
        # On Windows, since fork isn't used, the setup logger won't
        # exist in the child, so it would be created and the message
        # would appear - hence the "if posix" clause.
        logger = logging.getLogger('setup')
        logger.critical('Should not appear, because of disabled logger ...')
    stop_event.wait()
    listener.stop()

def worker_process(config):
    """
    A number of these are spawned for the purpose of illustration. In
    practice, they could be a heterogeneous bunch of processes rather than
    ones which are identical to each other.

    This initialises logging according to the specified configuration,
    and logs a hundred messages with random levels to randomly selected
    loggers.

    A small sleep is added to allow other processes a chance to run. This
    is not strictly needed, but it mixes the output from the different
    processes a bit more than if it's left out.
    """
    logging.config.dictConfig(config)
    levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
              logging.CRITICAL]
    loggers = ['foo', 'foo.bar', 'foo.bar.baz',
               'spam', 'spam.ham', 'spam.ham.eggs']
    if os.name == 'posix':
        # On POSIX, the setup logger will have been configured in the
        # parent process, but should have been disabled following the
        # dictConfig call.
        # On Windows, since fork isn't used, the setup logger won't
        # exist in the child, so it would be created and the message
        # would appear - hence the "if posix" clause.
        logger = logging.getLogger('setup')
        logger.critical('Should not appear, because of disabled logger ...')
    for i in range(100):
        lvl = random.choice(levels)
        logger = logging.getLogger(random.choice(loggers))
        logger.log(lvl, 'Message no. %d', i)
        time.sleep(0.01)

def main():
    q = Queue()
    # The main process gets a simple configuration which prints to the console.
    config_initial = {
        'version': 1,
        'formatters': {
            'detailed': {
                'class': 'logging.Formatter',
                'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            }
        },
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'level': 'INFO',
            },
        },
        'root': {
            'level': 'DEBUG',
            'handlers': ['console']
        },
    }
    # The worker process configuration is just a QueueHandler attached to the
    # root logger, which allows all messages to be sent to the queue.
    # We disable existing loggers to disable the "setup" logger used in the
    # parent process. This is needed on POSIX because the logger will
    # be there in the child following a fork().
    config_worker = {
        'version': 1,
        'disable_existing_loggers': True,
        'handlers': {
            'queue': {
                'class': 'logging.handlers.QueueHandler',
                'queue': q,
            },
        },
        'root': {
            'level': 'DEBUG',
            'handlers': ['queue']
        },
    }
    # The listener process configuration shows that the full flexibility of
    # logging configuration is available to dispatch events to handlers however
    # you want.
    # We disable existing loggers to disable the "setup" logger used in the
    # parent process. This is needed on POSIX because the logger will
    # be there in the child following a fork().
    config_listener = {
        'version': 1,
        'disable_existing_loggers': True,
        'formatters': {
            'detailed': {
                'class': 'logging.Formatter',
                'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            },
            'simple': {
                'class': 'logging.Formatter',
                'format': '%(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            }
        },
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'level': 'INFO',
                'formatter': 'simple',
            },
            'file': {
                'class': 'logging.FileHandler',
                'filename': 'mplog.log',
                'mode': 'w',
                'formatter': 'detailed',
            },
            'foofile': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-foo.log',
                'mode': 'w',
                'formatter': 'detailed',
            },
            'errors': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-errors.log',
                'mode': 'w',
                'level': 'ERROR',
                'formatter': 'detailed',
            },
        },
        'loggers': {
            'foo': {
                'handlers': ['foofile']
            }
        },
        'root': {
            'level': 'DEBUG',
            'handlers': ['console', 'file', 'errors']
        },
    }
    # Log some initial events, just to show that logging in the parent works
    # normally.
    logging.config.dictConfig(config_initial)
    logger = logging.getLogger('setup')
    logger.info('About to create workers ...')
    workers = []
    for i in range(5):
        wp = Process(target=worker_process, name='worker %d' % (i + 1),
                     args=(config_worker,))
        workers.append(wp)
        wp.start()
        logger.info('Started worker: %s', wp.name)
    logger.info('About to create listener ...')
    stop_event = Event()
    lp = Process(target=listener_process, name='listener',
                 args=(q, stop_event, config_listener))
    lp.start()
    logger.info('Started listener')
    # We now hang around for the workers to finish their work.
    for wp in workers:
        wp.join()
    # Workers all done, listening can now stop.
    # Logging in the parent still works normally.
    logger.info('Telling listener to stop ...')
    stop_event.set()
    lp.join()
    logger.info('All done.')

if __name__ == '__main__':
    main()

在发送到SysLogHandler的消息中插入BOM

RFC 5424 要求将Unicode消息作为一组字节发送到syslog守护进程,这些字节具有以下结构:可选的纯ASCII组件,后跟UTF-8字节顺序标记(BOM),随后使用UTF- 8。 (见 相关章节的规范。)

在Python 3.1中,将代码添加到 SysLogHandler 以将BOM插入到消息中,但不幸的是,BOM出现在消息的开头,因此不允许任何纯ASCII组件出现在消息之前。

由于此行为已损坏,因此正在从Python 3.2.4及更高版本中删除不正确的BOM插入代码。但是,它不会被替换,并且如果要生成包括BOM的可靠的纯ASCII序列(在其之前的任意Unicode之后),使用UTF-8编码的RFC 5424兼容消息,那么您需要执行以下:

  1. Formatter 实例附加到 SysLogHandler 实例,并使用格式字符串,例如:

    'ASCII section\ufeffUnicode section'
    

    Unicode代码点U + FEFF,当使用UTF-8编码时,将被编码为UTF-8 BOM - 字节串 b'\xef\xbb\xbf'

  2. 用任何你喜欢的占位符替换ASCII部分,但确保替换后出现在那里的数据总是ASCII(这样,它将在UTF-8编码后保持不变)。

  3. 用任何你喜欢的占位符替换Unicode部分;如果替换后出现的数据包含ASCII范围之外的字符,那就很好 - 它将使用UTF-8编码。

格式化消息 will 使用 SysLogHandler 的UTF-8编码进行编码。如果遵循上述规则,您应该能够生成符合RFC 5424的消息。如果没有,日志记录可能不会抱怨,但是您的消息不符合RFC 5424,并且您的syslog守护程序可能会抱怨。

实现结构化日志

虽然大多数日志记录消息是用于人类阅读,因此不容易机器可解析,可能有一些情况下,你想要输出消息的结构化格式的 is 能够由程序解析(不需要复杂的正则表达式来解析日志消息)。这是直接实现使用日志包。有很多方法可以实现这一点,但是下面是一个简单的方法,它使用JSON以机器可分析的方式串行化事件:

import json
import logging

class StructuredMessage(object):
    def __init__(self, message, **kwargs):
        self.message = message
        self.kwargs = kwargs

    def __str__(self):
        return '%s >>> %s' % (self.message, json.dumps(self.kwargs))

_ = StructuredMessage   # optional, to improve readability

logging.basicConfig(level=logging.INFO, format='%(message)s')
logging.info(_('message 1', foo='bar', bar='baz', num=123, fnum=123.456))

如果上述脚本运行,则打印:

message 1 >>> {"fnum": 123.456, "num": 123, "bar": "baz", "foo": "bar"}

请注意,根据所使用的Python版本,项目的顺序可能不同。

如果您需要更专业的处理,可以使用自定义JSON编码器,如以下完整示例所示:

from __future__ import unicode_literals

import json
import logging

# This next bit is to ensure the script runs unchanged on 2.x and 3.x
try:
    unicode
except NameError:
    unicode = str

class Encoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, set):
            return tuple(o)
        elif isinstance(o, unicode):
            return o.encode('unicode_escape').decode('ascii')
        return super(Encoder, self).default(o)

class StructuredMessage(object):
    def __init__(self, message, **kwargs):
        self.message = message
        self.kwargs = kwargs

    def __str__(self):
        s = Encoder().encode(self.kwargs)
        return '%s >>> %s' % (self.message, s)

_ = StructuredMessage   # optional, to improve readability

def main():
    logging.basicConfig(level=logging.INFO, format='%(message)s')
    logging.info(_('message 1', set_value={1, 2, 3}, snowman='\u2603'))

if __name__ == '__main__':
    main()

当上述脚本运行时,它打印:

message 1 >>> {"snowman": "\u2603", "set_value": [1, 2, 3]}

请注意,根据所使用的Python版本,项目的顺序可能不同。

使用 dictConfig() 定制处理程序

有时你想以特定的方式自定义日志处理程序,如果你使用 dictConfig(),你可以这样做而无需子类化。例如,考虑您可能要设置日志文件的所有权。在POSIX上,这很容易使用 shutil.chown() 完成,但stdlib中的文件处理程序不提供内置支持。您可以使用简单的函数(例如)来自定义处理程序创建:

def owned_file_handler(filename, mode='a', encoding=None, owner=None):
    if owner:
        if not os.path.exists(filename):
            open(filename, 'a').close()
        shutil.chown(filename, *owner)
    return logging.FileHandler(filename, mode, encoding)

然后,您可以在传递给 dictConfig() 的日志配置中指定通过调用此函数创建日志处理程序:

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'default': {
            'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
        },
    },
    'handlers': {
        'file':{
            # The values below are popped from this dictionary and
            # used to create the handler, set the handler's level and
            # its formatter.
            '()': owned_file_handler,
            'level':'DEBUG',
            'formatter': 'default',
            # The values below are passed to the handler creator callable
            # as keyword arguments.
            'owner': ['pulse', 'pulse'],
            'filename': 'chowntest.log',
            'mode': 'w',
            'encoding': 'utf-8',
        },
    },
    'root': {
        'handlers': ['file'],
        'level': 'DEBUG',
    },
}

在这个例子中,我使用 pulse 用户和组设置所有权,仅仅是为了说明的目的。把它放在一个工作脚本,chowntest.py:

import logging, logging.config, os, shutil

def owned_file_handler(filename, mode='a', encoding=None, owner=None):
    if owner:
        if not os.path.exists(filename):
            open(filename, 'a').close()
        shutil.chown(filename, *owner)
    return logging.FileHandler(filename, mode, encoding)

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'default': {
            'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
        },
    },
    'handlers': {
        'file':{
            # The values below are popped from this dictionary and
            # used to create the handler, set the handler's level and
            # its formatter.
            '()': owned_file_handler,
            'level':'DEBUG',
            'formatter': 'default',
            # The values below are passed to the handler creator callable
            # as keyword arguments.
            'owner': ['pulse', 'pulse'],
            'filename': 'chowntest.log',
            'mode': 'w',
            'encoding': 'utf-8',
        },
    },
    'root': {
        'handlers': ['file'],
        'level': 'DEBUG',
    },
}

logging.config.dictConfig(LOGGING)
logger = logging.getLogger('mylogger')
logger.debug('A debug message')

要运行这个,你可能需要作为 root 运行:

$ sudo python3.3 chowntest.py
$ cat chowntest.log
2013-11-05 09:34:51,128 DEBUG mylogger A debug message
$ ls -l chowntest.log
-rw-r--r-- 1 pulse pulse 55 2013-11-05 09:34 chowntest.log

注意,这个例子使用Python 3.3,因为那是 shutil.chown() 出现的地方。这种方法应该支持任何支持 dictConfig() 的Python版本 - 即Python 2.7,3.2或更高版本。对于3.3之前的版本,您需要使用 os.chown()

在实践中,处理程序创建函数可能位于项目中某处的实用程序模块中。而不是配置中的行:

'()': owned_file_handler,

您可以使用:

'()': 'ext://project.util.owned_file_handler',

其中 project.util 可以替换为函数所在的包的实际名称。在上面的工作脚本中,使用 'ext://__main__.owned_file_handler' 应该工作。这里,实际可调用由 ext:// 规范的 dictConfig() 解决。

这个例子希望也指明了如何实现其他类型的文件更改的方式。设置特定的POSIX权限位 - 以相同的方式,使用 os.chmod()

当然,该方法还可以扩展到除 FileHandler 之外的处理程序类型 - 例如,旋转文件处理程序之一,或者完全不同类型的处理程序。

在应用程序中使用特定的格式化样式

在Python 3.2中,Formatter 获得了一个 style 关键字参数,它默认为 % 以实现向后兼容性,允许 {$ 的规范支持 str.format()string.Template 支持的格式化方法。注意,这控制日志消息的格式化以最终输出到日志,并且与如何构建单独的日志消息完全正交。

记录调用(debug()info() 等)只采用实际日志消息本身的位置参数,关键字参数仅用于确定如何处理日志调用的选项(例如,exc_info 关键字参数指示应记录回溯信息,或 extra 关键字参数以指示要添加到日志的附加上下文信息)。因此,您不能使用 str.format()string.Template 语法直接进行日志调用,因为内部日志记录包使用%-formatting来合并格式字符串和变量参数。在保持向下兼容性的情况下不会改变这一点,因为现有代码中的所有日志调用都将使用%-format字符串。

已经有建议将格式样式与特定的记录器相关联,但是该方法也会遇到向后兼容性问题,因为任何现有代码可能正在使用给定的记录器名称并使用%格式化。

要使日志记录能够在任何第三方库和代码之间互操作,在格式化的决策需要在单独的日志记录调用级别进行。这开辟了几种方式可以适应其他格式化风格。

使用LogRecord工厂

在Python 3.2中,以及上面提到的 Formatter 更改,日志包允许用户使用 setLogRecordFactory() 函数设置自己的 LogRecord 子类。你可以使用它来设置你自己的 LogRecord 子类,它通过重写 getMessage() 方法来做正确的事情。此方法的基类实现是 msg % args 格式化的地方,您可以在其中替换您的备用格式;但是,您应该小心支持所有格式化样式,并允许使用%格式作为默认格式,以确保与其他代码的互操作性。还应该注意调用 str(self.msg),就像基本实现一样。

有关详细信息,请参阅 setLogRecordFactory()LogRecord 上的参考文档。

使用自定义消息对象

还有另一种也许更简单的方法,你可以使用{} - 和$ - 格式化来构造你的单个日志消息。你可能还记得(从 使用任意对象作为消息),当记录时,你可以使用任意对象作为消息格式字符串,并且日志包将调用该对象上的 str() 以获取实际的格式字符串。考虑下面两个类:

class BraceMessage(object):
    def __init__(self, fmt, *args, **kwargs):
        self.fmt = fmt
        self.args = args
        self.kwargs = kwargs

    def __str__(self):
        return self.fmt.format(*self.args, **self.kwargs)

class DollarMessage(object):
    def __init__(self, fmt, **kwargs):
        self.fmt = fmt
        self.kwargs = kwargs

    def __str__(self):
        from string import Template
        return Template(self.fmt).substitute(**self.kwargs)

可以使用其中任何一个替换格式字符串,以允许{}或$格式化用于构建出现在格式化日志输出中的实际“消息”部分,而不是“%(message)s”或“{message}”或“$message”。如果你发现有点难以使用类名称每当你想记录的东西,你可以使它更好用,如果你使用别名,如 M_ 的消息(或者 __,如果你使用 _ 本地化)。

该方法的实例如下。首先,用 str.format() 格式化:

>>> __ = BraceMessage
>>> print(__('Message with {0} {1}', 2, 'placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})', point=p))
Message with coordinates: (0.50, 0.50)

其次,用 string.Template 格式化:

>>> __ = DollarMessage
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>

需要注意的一点是,使用这种方法不会产生显着的性能损失:实际的格式化不会发生在进行日志调用时,而是当(和if)记录的消息实际上要由处理程序输出到日志。所以唯一不寻常的事情可能会让你失望的是括号绕过格式字符串和参数,而不只是格式字符串。这是因为__符号只是一个构造函数调用上面显示的 XXXMessage 类之一的语法糖。

使用 dictConfig() 配置过滤器

can 使用 dictConfig() 配置过滤器,虽然它可能不是明显的乍一看如何做它(因此这个食谱)。由于 Filter 是包含在标准库中的唯一的过滤器类,并且它不太可能满足许多需求(它只是作为一个基类),你通常需要用一个重写的 filter() 方法定义你自己的 Filter 子类。为此,在过滤器的配置字典中指定 () 密钥,指定将用于创建过滤器的可调用项(类是最明显的,但您可以提供返回 Filter 实例的任何可调用项)。这里是一个完整的例子:

import logging
import logging.config
import sys

class MyFilter(logging.Filter):
    def __init__(self, param=None):
        self.param = param

    def filter(self, record):
        if self.param is None:
            allow = True
        else:
            allow = self.param not in record.msg
        if allow:
            record.msg = 'changed: ' + record.msg
        return allow

LOGGING = {
    'version': 1,
    'filters': {
        'myfilter': {
            '()': MyFilter,
            'param': 'noshow',
        }
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'filters': ['myfilter']
        }
    },
    'root': {
        'level': 'DEBUG',
        'handlers': ['console']
    },
}

if __name__ == '__main__':
    logging.config.dictConfig(LOGGING)
    logging.debug('hello')
    logging.debug('hello - noshow')

此示例显示如何以关键字参数的形式将配置数据传递到构建实例的可调用项。运行时,将打印上述脚本:

changed: hello

这表明过滤器按配置工作。

另外几点要注意:

  • 如果不能直接在配置中引用可调用对象(例如,如果它位于不同的模块中,并且不能直接在配置字典中导入),则可以使用 访问外部对象 中描述的形式 ext://...。例如,您可以在上面的示例中使用文本 'ext://__main__.MyFilter' 而不是 MyFilter

  • 与过滤器一样,此技术也可用于配置自定义处理程序和格式化程序。有关日志记录如何支持在其配置中使用用户定义对象的详细信息,请参阅 用户定义的对象,并参见上面的其他菜单食谱 使用 dictConfig() 定制处理程序

自定义异常格式

可能有时候你想要做自定义的异常格式化 - 为了参数的缘故,让我们说你想要每个记录的事件,即使当异常信息存在时,一行。您可以使用自定义格式化程序类来完成此操作,如以下示例所示:

import logging

class OneLineExceptionFormatter(logging.Formatter):
    def formatException(self, exc_info):
        """
        Format an exception so that it prints on a single line.
        """
        result = super(OneLineExceptionFormatter, self).formatException(exc_info)
        return repr(result)  # or format into one line however you want to

    def format(self, record):
        s = super(OneLineExceptionFormatter, self).format(record)
        if record.exc_text:
            s = s.replace('\n', '') + '|'
        return s

def configure_logging():
    fh = logging.FileHandler('output.txt', 'w')
    f = OneLineExceptionFormatter('%(asctime)s|%(levelname)s|%(message)s|',
                                  '%d/%m/%Y %H:%M:%S')
    fh.setFormatter(f)
    root = logging.getLogger()
    root.setLevel(logging.DEBUG)
    root.addHandler(fh)

def main():
    configure_logging()
    logging.info('Sample message')
    try:
        x = 1 / 0
    except ZeroDivisionError as e:
        logging.exception('ZeroDivisionError: %s', e)

if __name__ == '__main__':
    main()

运行时,将生成一个具有两行的文件:

28/01/2015 07:21:23|INFO|Sample message|
28/01/2015 07:21:23|ERROR|ZeroDivisionError: integer division or modulo by zero|'Traceback (most recent call last):\n  File "logtest7.py", line 30, in main\n    x = 1 / 0\nZeroDivisionError: integer division or modulo by zero'|

虽然上面的处理是简单的,它指明了如何格式化您的喜好的异常信息的方式。 traceback 模块可能有助于更专业的需求。

说出记录消息

可能存在希望以可听而不是可见格式呈现日志消息的情况。如果您的系统中有文本到语音(TTS)功能,即使它没有Python绑定,这也很容易做到。大多数TTS系统都有一个可以运行的命令行程序,这可以使用 subprocess 从处理程序中调用。这里假设TTS命令行程序不会期望与用户交互或花费很长时间来完成,并且记录的消息的频率不会高到使用户使用消息沼泽,并且可以接受消息一次一个而不是并发地发出。下面的示例实现等待一个消息在下一个消息被处理之前被说出,并且这可能导致其他处理器被保持等待。下面是一个简短的例子,显示了该方法,假设 espeak TTS包可用:

import logging
import subprocess
import sys

class TTSHandler(logging.Handler):
    def emit(self, record):
        msg = self.format(record)
        # Speak slowly in a female English voice
        cmd = ['espeak', '-s150', '-ven+f3', msg]
        p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
                             stderr=subprocess.STDOUT)
        # wait for the program to finish
        p.communicate()

def configure_logging():
    h = TTSHandler()
    root = logging.getLogger()
    root.addHandler(h)
    # the default formatter just returns the message
    root.setLevel(logging.DEBUG)

def main():
    logging.info('Hello')
    logging.debug('Goodbye')

if __name__ == '__main__':
    configure_logging()
    sys.exit(main())

运行时,这个脚本应该说“你好”,然后在女声中“再见”。

上述方法当然可以适用于其它TTS系统,甚至可以适用于可以通过从命令行运行的外部程序处理消息的其它系统。

缓冲日志消息并有条件地输出

可能有些情况下,您希望在临时区域中记录消息,并且仅在发生特定条件时才输出消息。例如,您可能希望在函数中开始记录调试事件,如果函数完成时没有错误,则不希望使用收集的调试信息使日志混乱,但如果有错误,则需要所有调试要输出的信息以及错误。

这里是一个例子,显示如何使用装饰器为您的功能,你想要日志记录的行为这样做。它使用 logging.handlers.MemoryHandler,它允许缓冲记录的事件,直到某些条件发生,此时缓冲的事件被 flushed - 传递到另一个处理程序(target 处理程序)进行处理。默认情况下,MemoryHandler 在其缓冲区填满或者级别大于或等于指定阈值的事件被刷新时被刷新。如果想要定制冲洗行为,您可以使用此配方与更专门的 MemoryHandler 子类。

示例脚本有一个简单的函数 foo,它只是循环遍历所有日志记录级别,写入 sys.stderr 以说明要登录的级别,然后实际记录该级别的消息。您可以将参数传递给 foo,如果为true,将记录在ERROR和CRITICAL级别 - 否则,它只记录DEBUG,INFO和WARNING级别。

脚本只是安排用装饰器来装饰 foo,该装饰器将执行所需的条件日志记录。装饰器将记录器作为参数,并在调用装饰函数的持续时间内附加内存处理程序。可以使用目标处理程序,发生刷新的级别和缓冲器的容量来附加地对装饰器进行参数化。这些默认为分别写入 sys.stderrlogging.ERROR100StreamHandler

这里是脚本:

import logging
from logging.handlers import MemoryHandler
import sys

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

def log_if_errors(logger, target_handler=None, flush_level=None, capacity=None):
    if target_handler is None:
        target_handler = logging.StreamHandler()
    if flush_level is None:
        flush_level = logging.ERROR
    if capacity is None:
        capacity = 100
    handler = MemoryHandler(capacity, flushLevel=flush_level, target=target_handler)

    def decorator(fn):
        def wrapper(*args, **kwargs):
            logger.addHandler(handler)
            try:
                return fn(*args, **kwargs)
            except Exception:
                logger.exception('call failed')
                raise
            finally:
                super(MemoryHandler, handler).flush()
                logger.removeHandler(handler)
        return wrapper

    return decorator

def write_line(s):
    sys.stderr.write('%s\n' % s)

def foo(fail=False):
    write_line('about to log at DEBUG ...')
    logger.debug('Actually logged at DEBUG')
    write_line('about to log at INFO ...')
    logger.info('Actually logged at INFO')
    write_line('about to log at WARNING ...')
    logger.warning('Actually logged at WARNING')
    if fail:
        write_line('about to log at ERROR ...')
        logger.error('Actually logged at ERROR')
        write_line('about to log at CRITICAL ...')
        logger.critical('Actually logged at CRITICAL')
    return fail

decorated_foo = log_if_errors(logger)(foo)

if __name__ == '__main__':
    logger.setLevel(logging.DEBUG)
    write_line('Calling undecorated foo with False')
    assert not foo(False)
    write_line('Calling undecorated foo with True')
    assert foo(True)
    write_line('Calling decorated foo with False')
    assert not decorated_foo(False)
    write_line('Calling decorated foo with True')
    assert decorated_foo(True)

运行此脚本时,应遵循以下输出:

Calling undecorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling undecorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
about to log at CRITICAL ...
Calling decorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling decorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
Actually logged at DEBUG
Actually logged at INFO
Actually logged at WARNING
Actually logged at ERROR
about to log at CRITICAL ...
Actually logged at CRITICAL

可以看到,实际日志记录输出仅在记录严重性为ERROR或更大的事件时发生,但在这种情况下,也会记录较低严重性的任何先前事件。

你当然可以使用传统的装饰方法:

@log_if_errors(logger)
def foo(fail=False):
    ...

格式化时间使用UTC(GMT)通过配置

有时你想使用UTC格式化时间,这可以使用类如 UTCFormatter,如下所示:

import logging
import time

class UTCFormatter(logging.Formatter):
    converter = time.gmtime

然后您可以在代码中使用 UTCFormatter 而不是 Formatter。如果您想通过配置执行此操作,可以使用 dictConfig() API,并采用以下完整示例所示的方法:

import logging
import logging.config
import time

class UTCFormatter(logging.Formatter):
    converter = time.gmtime

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'utc': {
            '()': UTCFormatter,
            'format': '%(asctime)s %(message)s',
        },
        'local': {
            'format': '%(asctime)s %(message)s',
        }
    },
    'handlers': {
        'console1': {
            'class': 'logging.StreamHandler',
            'formatter': 'utc',
        },
        'console2': {
            'class': 'logging.StreamHandler',
            'formatter': 'local',
        },
    },
    'root': {
        'handlers': ['console1', 'console2'],
   }
}

if __name__ == '__main__':
    logging.config.dictConfig(LOGGING)
    logging.warning('The local time is %s', time.asctime())

当这个脚本运行时,它应该打印出来:

2015-10-17 12:53:29,501 The local time is Sat Oct 17 13:53:29 2015
2015-10-17 13:53:29,501 The local time is Sat Oct 17 13:53:29 2015

显示时间如何格式化为本地时间和UTC,每个处理程序一个。

使用上下文管理器进行选择性日志记录

有时候,临时更改日志配置和做某事后将其还原是有用的。为此,上下文管理器是保存和恢复日志记录上下文的最明显的方法。下面是一个这样的上下文管理器的简单示例,它允许您可选择地更改日志记录级别,并且仅在上下文管理器的范围内添加日志处理程序:

import logging
import sys

class LoggingContext(object):
    def __init__(self, logger, level=None, handler=None, close=True):
        self.logger = logger
        self.level = level
        self.handler = handler
        self.close = close

    def __enter__(self):
        if self.level is not None:
            self.old_level = self.logger.level
            self.logger.setLevel(self.level)
        if self.handler:
            self.logger.addHandler(self.handler)

    def __exit__(self, et, ev, tb):
        if self.level is not None:
            self.logger.setLevel(self.old_level)
        if self.handler:
            self.logger.removeHandler(self.handler)
        if self.handler and self.close:
            self.handler.close()
        # implicit return of None => don't swallow exceptions

如果指定级别值,记录器的级别将设置为上下文管理器覆盖的with块范围内的值。如果指定一个处理程序,它将在进入块时添加到记录器,并在退出块时删除。你也可以要求管理器在块退出时为你关闭处理程序 - 如果你不再需要处理程序,你可以这样做。

为了说明它是如何工作的,我们可以添加以下代码块到上面:

if __name__ == '__main__':
    logger = logging.getLogger('foo')
    logger.addHandler(logging.StreamHandler())
    logger.setLevel(logging.INFO)
    logger.info('1. This should appear just once on stderr.')
    logger.debug('2. This should not appear.')
    with LoggingContext(logger, level=logging.DEBUG):
        logger.debug('3. This should appear once on stderr.')
    logger.debug('4. This should not appear.')
    h = logging.StreamHandler(sys.stdout)
    with LoggingContext(logger, level=logging.DEBUG, handler=h, close=True):
        logger.debug('5. This should appear twice - once on stderr and once on stdout.')
    logger.info('6. This should appear just once on stderr.')
    logger.debug('7. This should not appear.')

我们最初将记录器的级别设置为 INFO,因此消息#1出现,消息#2不出现。然后,我们在下面的 with 块中临时将级别更改为 DEBUG,因此出现消息#3。在块退出后,记录器的级别恢复为 INFO,因此不会出现消息#4。在下一个 with 块中,我们再次将级别设置为 DEBUG,但也添加了一个写入 sys.stdout 的处理程序。因此,消息#5在控制台上出现两次(一次通过 stderr,一次通过 stdout)。在 with 语句完成之后,状态为之前的状态,因此消息#6出现(如消息#1),而消息#7不出现(就像消息#2)。

如果我们运行生成的脚本,结果如下:

$ python logctx.py
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.

如果我们再次运行它,但管道 stderr/dev/null,我们看到以下,这是写入 stdout 的唯一消息:

$ python logctx.py 2>/dev/null
5. This should appear twice - once on stderr and once on stdout.

再次,但管道 stdout/dev/null,我们得到:

$ python logctx.py >/dev/null
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.

在这种情况下,打印到 stdout 的消息#5不会按预期显示。

当然,这里描述的方法可以被概括,例如临时附加日志过滤器。注意,上面的代码在Python 2和Python 3中都有效。