Python中的装饰器

Python中的装饰器用的较为普遍,大致思路是在函数运行前后封装一些操作,以实现诸如如打印日志、统计运行时间、保存中间变量的效果。本文通过几个实例说明装饰器的基本用法和应用场景。

1.引子

浏览网上各种解释装饰器的文章,提到最多的就是斐波那契数列的计算,这里先给出基础的计算斐波那契数列的函数:

1
2
3
4
5
def fib(n):
if n <= 2:
return n - 1
else:
return fib(n - 1) + fib(n - 2)

以上代码采用递归的方式计算第n个斐波那契数,其实这种递归计算方法会产生很多重复计算,我们将fib(5)的计算拆解开:

1
2
3
4
5
6
7
             fib(5)
/ \
fib(4) fib(3)
/ \ / \
fib(3) fib(2) fib(2) fib(1)
/ \
fib(2) fib(1)

从上面图中可以看到,fib(3)计算了2次,fib(2)计算了3次,fib(1)计算了2次,如果能将递归过程中的中间变量存储起来,就可以节省出很多时间,这里用装饰器的方法存储这些中间变量,首先给出代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def cache(f):
cache_dict = {}

@wraps(f)
def _cache(n):
if n in cache_dict.keys():
return cache_dict[n]
else:
cache_dict[n] = f(n)
return cache_dict[n]

return _cache

@cache
def fib(n):
if n <= 2:
return n - 1
else:
return fib(n - 1) + fib(n - 2)

我们在装饰器中定义了一个全局的dict,用来存储第i个斐波那契值,每次计算fib(i)之前先去dict中查看是否已经缓存改值,如果缓存了直接从dict中取,否则计算fib(i)并写入dict中。

以上就实现了通过装饰器缓存部分变量,达到减少重复计算的目的,下面我们来了解一下装饰器的运行机制,以及变量的生命周期。

2.装饰器原理剖析

从上面斐波那契数列的例子可以看到,装饰器其实是一个接受函数做参数,返回值为函数的函数。笼统的可以概括成以下的形式:

1
2
3
4
5
6
7
8
9
10
11
12
def decorator(f):
def _wrap(args):
do somthing
result = f(args)
do somthing
return result
retun _wrap


@decorator
def foo(args):
do somthing

返回的函数其实包括了要运行的函数,并在函数运行前后做了若干操作。那当我们调用foo(args)到底发生了什么呢?

当显示的调用foo(args)时,可以认为先执行了装饰器函数decorator(f),装饰器函数返回了函数_wrap(args), 整体的调用顺序即是
decorator(f)(args),为了验证这个的结论,我们将上面斐波那契数列的例子修改一下,执行下面的语句:

1
2
3
4
5
6
print fib(20)
print cache(fib)(20)

#output
4181
4181

可以看到两种输出方式结果是一致的, 从而验证了对于装饰器调用顺序的结论。为了更好的理解装饰器的调用顺序,这里对引子中的例子进行修改,再增加一层装饰器,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

def cache(f):
cache_dict = {"test": "foo"}

@wraps(f)
def _cache(n):
if n in cache_dict.keys():
return cache_dict[n]
else:
cache_dict[n] = f(n)
return cache_dict[n]

return _cache


def record(f):
@wraps(f)
def _wrap(n):
start_time = time.time()
result = f(n)
end_time = time.time()
logger.info('f_name:%s, n:%s, cost_time:%s', f.__name__, n, end_time - start_time)
return result

return _wrap


@record
@cache
def fib(n):
if n <= 2:
return n - 1
else:
return fib(n - 1) + fib(n - 2)

可以看到增加了record装饰器,作用是记录函数运行时间,先调用一下fib(20),看看结果:

1
2
3
4
5
6
7
8
2017-12-05 21:03:12,115 [140735241039872] - [decorate_learn.py 32] INFO n:2, cost_time:9.53674316406e-07
2017-12-05 21:03:12,115 [140735241039872] - [decorate_learn.py 32] INFO n:1, cost_time:3.09944152832e-06
2017-12-05 21:03:12,115 [140735241039872] - [decorate_learn.py 32] INFO n:3, cost_time:0.000406980514526
2017-12-05 21:03:12,115 [140735241039872] - [decorate_learn.py 32] INFO n:2, cost_time:2.14576721191e-06
2017-12-05 21:03:12,116 [140735241039872] - [decorate_learn.py 32] INFO n:4, cost_time:0.000722885131836
2017-12-05 21:03:12,116 [140735241039872] - [decorate_learn.py 32] INFO n:3, cost_time:3.09944152832e-06
2017-12-05 21:03:12,116 [140735241039872] - [decorate_learn.py 32] INFO n:5, cost_time:0.00133514404297
3

可以看到每次调用fib(n)函数的时间都被打印出来,如上面对装饰器调用顺序的结论,这里同样跑一下record(cache(fib))(5),得到如下结果:

1
2
3
4
5
6
7
8
9
2017-12-05 21:09:35,869 [140735241039872] - [decorate_learn.py 32] INFO n:2, cost_time:2.86102294922e-06
2017-12-05 21:09:35,869 [140735241039872] - [decorate_learn.py 32] INFO n:1, cost_time:3.09944152832e-06
2017-12-05 21:09:35,869 [140735241039872] - [decorate_learn.py 32] INFO n:3, cost_time:0.000430107116699
2017-12-05 21:09:35,869 [140735241039872] - [decorate_learn.py 32] INFO n:2, cost_time:1.90734863281e-06
2017-12-05 21:09:35,870 [140735241039872] - [decorate_learn.py 32] INFO n:4, cost_time:0.000657081604004
2017-12-05 21:09:35,870 [140735241039872] - [decorate_learn.py 32] INFO n:3, cost_time:2.14576721191e-06
2017-12-05 21:09:35,870 [140735241039872] - [decorate_learn.py 32] INFO n:5, cost_time:0.00082802772522
2017-12-05 21:09:35,870 [140735241039872] - [decorate_learn.py 32] INFO n:5, cost_time:0.000908136367798
3

以上研究了装饰器调用函数的流程,下面我们看下装饰器中变量的生命周期。
注意到在斐波那契数列的例子中,定义了cache_dict字典,那该字典何时被创建,何时被销毁呢,为此我们做以下实验:

1
2
3
4
5
6
7
8
9
10
11
import sys
import decorate_learn


for i in range(5):
decorate_learn.fib(i + 1)

reload(decorate_learn)

for i in range(5):
decorate_learn.fib(i + 1)

装饰器也稍作改变,每次调用的时候打印cache_dict

1
2
3
4
5
6
7
8
9
10
11
12
13
def cache(f):
cache_dict = {"test": "foo"}

@wraps(f)
def _cache(n):
logger.info('n:%s,cache_dict:%s', n, cache_dict)
if n in cache_dict.keys():
return cache_dict[n]
else:
cache_dict[n] = f(n)
return cache_dict[n]

return _cache

之所以这么做,是因为没有找到太好能够显示变量创建销毁的方法,所以每次调用装饰器的时候打印该变量,看下改变量的内容是否有被清空重建,
看下输出日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2017-12-06 09:45:07,733 [140735241039872] - [decorate_learn.py 16] INFO n:1,cache_dict:{'test': 'foo'}
2017-12-06 09:45:07,733 [140735241039872] - [decorate_learn.py 16] INFO n:2,cache_dict:{'test': 'foo', 1: 0}
2017-12-06 09:45:07,733 [140735241039872] - [decorate_learn.py 16] INFO n:3,cache_dict:{'test': 'foo', 1: 0, 2: 1}
2017-12-06 09:45:07,733 [140735241039872] - [decorate_learn.py 16] INFO n:2,cache_dict:{'test': 'foo', 1: 0, 2: 1}
2017-12-06 09:45:07,733 [140735241039872] - [decorate_learn.py 16] INFO n:1,cache_dict:{'test': 'foo', 1: 0, 2: 1}
2017-12-06 09:45:07,733 [140735241039872] - [decorate_learn.py 16] INFO n:4,cache_dict:{'test': 'foo', 1: 0, 2: 1, 3: 1}
2017-12-06 09:45:07,733 [140735241039872] - [decorate_learn.py 16] INFO n:3,cache_dict:{'test': 'foo', 1: 0, 2: 1, 3: 1}
2017-12-06 09:45:07,733 [140735241039872] - [decorate_learn.py 16] INFO n:2,cache_dict:{'test': 'foo', 1: 0, 2: 1, 3: 1}
2017-12-06 09:45:07,733 [140735241039872] - [decorate_learn.py 16] INFO n:5,cache_dict:{'test': 'foo', 1: 0, 2: 1, 3: 1, 4: 2}
2017-12-06 09:45:07,733 [140735241039872] - [decorate_learn.py 16] INFO n:4,cache_dict:{'test': 'foo', 1: 0, 2: 1, 3: 1, 4: 2}
2017-12-06 09:45:07,733 [140735241039872] - [decorate_learn.py 16] INFO n:3,cache_dict:{'test': 'foo', 1: 0, 2: 1, 3: 1, 4: 2}
2017-12-06 09:45:07,734 [140735241039872] - [decorate_learn.py 16] INFO n:1,cache_dict:{'test': 'foo'}
2017-12-06 09:45:07,734 [140735241039872] - [decorate_learn.py 16] INFO n:2,cache_dict:{'test': 'foo', 1: 0}
2017-12-06 09:45:07,735 [140735241039872] - [decorate_learn.py 16] INFO n:3,cache_dict:{'test': 'foo', 1: 0, 2: 1}
2017-12-06 09:45:07,735 [140735241039872] - [decorate_learn.py 16] INFO n:2,cache_dict:{'test': 'foo', 1: 0, 2: 1}
2017-12-06 09:45:07,735 [140735241039872] - [decorate_learn.py 16] INFO n:1,cache_dict:{'test': 'foo', 1: 0, 2: 1}
2017-12-06 09:45:07,735 [140735241039872] - [decorate_learn.py 16] INFO n:4,cache_dict:{'test': 'foo', 1: 0, 2: 1, 3: 1}
2017-12-06 09:45:07,736 [140735241039872] - [decorate_learn.py 16] INFO n:3,cache_dict:{'test': 'foo', 1: 0, 2: 1, 3: 1}
2017-12-06 09:45:07,738 [140735241039872] - [decorate_learn.py 16] INFO n:2,cache_dict:{'test': 'foo', 1: 0, 2: 1, 3: 1}
2017-12-06 09:45:07,738 [140735241039872] - [decorate_learn.py 16] INFO n:5,cache_dict:{'test': 'foo', 1: 0, 2: 1, 3: 1, 4: 2}
2017-12-06 09:45:07,739 [140735241039872] - [decorate_learn.py 16] INFO n:4,cache_dict:{'test': 'foo', 1: 0, 2: 1, 3: 1, 4: 2}
2017-12-06 09:45:07,739 [140735241039872] - [decorate_learn.py 16] INFO n:3,cache_dict:{'test': 'foo', 1: 0, 2: 1, 3: 1, 4: 2}

从日志的输出可以看到,一次程序执行过程中,装饰器中dict是和fib函数同时存在的,只有当主程序退出时,dict才会销毁。
以上我们研究了装饰器的写法和一些简单原理,下面给出一种使用类写装饰器的方法。

3.装饰器的另一种写法

装饰器除了函数式的写法,还可以封装成类,并重写call方法即可,还是以菲波那切数列为例,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import time
from functools import wraps


class cache(object):
def __init__(self):
self.cache_dict = {}

def __call__(self, f):
@wraps(f)
def _wrap(n):
print self.cache_dict
if n in self.cache_dict.keys():
return self.cache_dict[n]
else:
self.cache_dict[n] = f(n)
return self.cache_dict[n]
return _wrap


class record(object):
def __init__(self):
pass

def __call__(self, f):
@wraps(f)
def _wrap(n):
start_time = time.time()
result = f(n)
end_time = time.time()
print 'f_name:%s, n:%s, cost_time:%s' % (f.__name__, n, end_time - start_time)
return result
return _wrap


@record()
@cache()
def fib(n):
if n <= 2:
return n - 1
else:
return fib(n - 1) + fib(n - 2)


@record()
@cache()
def foo(n):
print "foo"

# print fib(1)
print fib(20)
foo(1)

代码里需要解释的一点是我们引入了functools.wraps,目的是保持函数的类型一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
#with wraps
fib(1)
print fib.__name__

#out
#fib

#without wraps
fib(1)
print fib.__name__

#out
#_wrap

从上面可以看到,加了wrap可以让函数保持原有的名字

总结

以上简单介绍了装饰器的实现方法和一些自己的小探究,笔而简之,以备阙忘。

一个线程安全的单例模式

单例模式的一般构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class SingletonConsumer {
private final static Logger logger = Logger.getLogger(SingletonConsumer.class);
private static SingletonConsumer instance;
private SingletonConsumer() {
}
public SingletonConsumer getInstance() {
if (instance == null) {
logger.debug("instance is null, trying to instantiate a new one");
instance = new SingletonConsumer();
} else {
logger.debug("instance is not null, return the already-instantiated one");
}
return instance;
}
}

以上这种构造方法在单线程下运行是安全的,但是如果放到多线程下,则会出现各种各样的问题,为此我们设计一个实验来验证多线程下,以上方法会出现什么问题。

Experiment

实验中我们设置10个线程去创建SingletonConsumer实例,最后验证到底创建了多少个实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void singletonConsumerTest() throws InterruptedException {
ExecutorService executors = Executors.newFixedThreadPool(10);
Set<SingletonConsumer> set = new HashSet<>();
for(int i = 0; i < 10; i++){
executors.execute(
() -> set.add(SingletonConsumer.getInstance())
);
}
executors.shutdown();
executors.awaitTermination(1, TimeUnit.HOURS);
Assert.assertEquals(10, set.size());
}

运行测试,输出结果如下

1
2
3
4
5
6
7
8
9
10
11
2017-02-26 13:50:52 DEBUG SingletonConsumer:20 - instance is null, trying to instantiate a new one
2017-02-26 13:50:52 DEBUG SingletonConsumer:20 - instance is null, trying to instantiate a new one
2017-02-26 13:50:52 DEBUG SingletonConsumer:20 - instance is null, trying to instantiate a new one
2017-02-26 13:50:52 DEBUG SingletonConsumer:20 - instance is null, trying to instantiate a new one
2017-02-26 13:50:52 DEBUG SingletonConsumer:20 - instance is null, trying to instantiate a new one
2017-02-26 13:50:52 DEBUG SingletonConsumer:20 - instance is null, trying to instantiate a new one
2017-02-26 13:50:52 DEBUG SingletonConsumer:20 - instance is null, trying to instantiate a new one
2017-02-26 13:50:52 DEBUG SingletonConsumer:20 - instance is null, trying to instantiate a new one
2017-02-26 13:50:52 DEBUG SingletonConsumer:20 - instance is null, trying to instantiate a new one
2017-02-26 13:50:52 DEBUG SingletonConsumer:20 - instance is null, trying to instantiate a new one
set size:10

会发现此时实际上构造了是个SingletonConsumer实例,那怎么才能构造线程安全的单例模式?
首先想到的方法是将getInstance的代码用synchronized包起来,这样就能够保证getInstance方法每次只能有一个线程访问到,于是代码就变成了
下面的样子

1
2
3
4
5
6
7
8
9
10
11
public static SingletonConsumer getInstance() {
synchronized (SingletonConsumer.class) {
if (instance == null) {
logger.debug("instance is null, trying to instantiate a new one");
instance = new SingletonConsumer();
} else {
logger.debug("instance is not null, return the already-instantiated one");
}
return instance;
}
}

我们再次运行测试脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
2017-02-26 14:01:12 DEBUG SingletonConsumer:21 - instance is null, trying to instantiate a new one
2017-02-26 14:01:12 DEBUG SingletonConsumer:24 - instance is not null, return the already-instantiated one
2017-02-26 14:01:12 DEBUG SingletonConsumer:24 - instance is not null, return the already-instantiated one
2017-02-26 14:01:12 DEBUG SingletonConsumer:24 - instance is not null, return the already-instantiated one
2017-02-26 14:01:12 DEBUG SingletonConsumer:24 - instance is not null, return the already-instantiated one
2017-02-26 14:01:12 DEBUG SingletonConsumer:24 - instance is not null, return the already-instantiated one
2017-02-26 14:01:12 DEBUG SingletonConsumer:24 - instance is not null, return the already-instantiated one
2017-02-26 14:01:12 DEBUG SingletonConsumer:24 - instance is not null, return the already-instantiated one
2017-02-26 14:01:12 DEBUG SingletonConsumer:24 - instance is not null, return the already-instantiated one
2017-02-26 14:01:12 DEBUG SingletonConsumer:24 - instance is not null, return the already-instantiated one
java.lang.AssertionError:
Expected :10
Actual :1
<Click to see difference>

此时发现,只初始化了一个SingletonConsumer实例,说明这种方法是work的。

但是仔细去想一想,上面的方法是有效率问题的。假设有一个线程A正在synchronized块中判断instance是否为null,此时其他线程只能等待线程A判断完毕才可以再去判断。仔细想想,instance是否为空,其实是可以多个线程同时去判断的,因此我们将代码修改成一下形式:

1
2
3
4
5
6
7
8
9
if (instance == null) {
synchronized (SingletonConsumer.class) {
logger.debug("instance is null, trying to instantiate a new one");
instance = new SingletonConsumer();
}
} else {
logger.debug("instance is not null, return the already-instantiated one");
}
return instance;

上面的代码中,我们将instance是否为空的判断移到了同步块的外面。那这种方法是否是线程安全的呢,再次运行测试脚本,观察结果:

1
2
3
4
5
6
7
8
9
10
11
2017-02-26 14:14:18 DEBUG SingletonConsumer:28 - instance is null, trying to instantiate a new one
2017-02-26 14:14:18 DEBUG SingletonConsumer:28 - instance is null, trying to instantiate a new one
2017-02-26 14:14:18 DEBUG SingletonConsumer:28 - instance is null, trying to instantiate a new one
2017-02-26 14:14:18 DEBUG SingletonConsumer:28 - instance is null, trying to instantiate a new one
2017-02-26 14:14:18 DEBUG SingletonConsumer:28 - instance is null, trying to instantiate a new one
2017-02-26 14:14:18 DEBUG SingletonConsumer:28 - instance is null, trying to instantiate a new one
2017-02-26 14:14:18 DEBUG SingletonConsumer:28 - instance is null, trying to instantiate a new one
2017-02-26 14:14:18 DEBUG SingletonConsumer:28 - instance is null, trying to instantiate a new one
2017-02-26 14:14:18 DEBUG SingletonConsumer:28 - instance is null, trying to instantiate a new one
2017-02-26 14:14:18 DEBUG SingletonConsumer:28 - instance is null, trying to instantiate a new one
set size:10

通过结果发现,依然实例化了10个SingeltonCoumser。
考虑一种情况,初始install为空,线程A判断完instance是否为空,发现instance为null,刚好线程A的时间片用完,轮到线程B去判断instance是否为空,线程B发现instance也是null,此时时间片又回到了线程A的手中,线程A去创建SingletonConsumer对象,创建完成,线程B去创建对象,这样下去,就造成了上述实验的现象,因此,未解决上面的问题,需要带同步块中同样去判断instance是否为null。最后的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static SingletonConsumer getInstance() {
if (instance == null) {
synchronized (SingletonConsumer.class) {
if (instance == null) {
logger.debug("instance is null, trying to instantiate a new one");
instance = new SingletonConsumer();
}
else {
logger.debug("instance is not null, return the already-instantiated one ");
}
}
} else {
logger.debug("instance is not null, return the already-instantiated one");
}
return instance;
}

最有还有一点要注意,由于JVM会对代码进行优化,所以代码的执行顺序在真运行的时候会发生变化,会导致赋值操作编程不可见的,因此才进行赋值操作时,instance有可能只拿到一个为完全初始化的实例,这样会导致一些错误。

instance = new SingletonConsumer();

解决办法是将instance生命为volatile的,volatile关键词可以保证可见性和有序性,其具体内容待下次再表。

Summary

总之,一个线程安全的单例模式需要注意以下3点:

1.getInstance的需要用synchronized关键词

2.为提高效率,instance是否为空可提到同步块以外,但内层的判断依然要保留

3.instance需要声明为volatile

代码见:GitHub

生产者和消费者模型初探

本文将尝试构造一个生产者,消费者模型,通过模型的构建,学习一下多线程编程。
代码见:GitHub

1.生产者消费者模型

关于生产者消费者模型的基本含义不在赘述。本实验的拟构造一个生产者,从文件中按行读取数据,放入队列中,构建十个消费者,从队列中读取数据,将数据写回文件。

1.1 starter

首先我们构造一个方法,改方法负责初始化生产者和消费者线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static void startMutiTheads(String inputPath, String outPath) {
LocalDateTime startTime = LocalDateTime.now();
List<Thread> threads = new ArrayList<>();
LinkedBlockingQueue<Optional<String>> queue = new LinkedBlockingQueue<>();
FileUtil.clearFileContents(outPath);
Producer producer = new Producer(inputPath, queue);
threads.add(new Thread(producer));
Consumer consumer = new Consumer(outPath, queue);
for(int i = 0; i < Consumer.consumerThreadCount; i++){
Thread consumerThread = new Thread(consumer);
threads.add(consumerThread);
producer.addConsumer(consumer);
}
threads.forEach(Thread :: start);
threads.forEach(thread -> {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
consumer.getFileUtil().flushAndClose();
// get consumer's totalCount:
logger.debug("Consumer's totalCount: " + consumer.getTotalCount());
LocalDateTime endTime = LocalDateTime.now();
logger.info(String.format("It takes %s seconds to finish", LocalDateTime.from(startTime).until(endTime, ChronoUnit.SECONDS)));
}

函数的有两个参数,分别表表示文件的输入路径和输出路径。
下面我们构建一个thread列表,该列表存储所有的线程实例。

1
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();

我们选用concurrent包中的LinkedBlockingQueue队列,生产者线程将内从从文件读出放至该队列,消费者线程从改队列读出数据写回到文件。

LinkedBlockingQueue实现是线程安全的,实现了先进先出等特性,可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。

下面我们构建了一个生产者线程,并放到theads列表中

1
2
Producer producer = new Producer(inputPath, queue);
threads.add(new Thread(producer));

再之后构建十个消费者线程,其中Consumer.consumerThreadCount是在Counsmer中定义的一个静态变量,值为10

1
2
3
4
5
6
Consumer consumer = new Consumer(outPath, queue);
for(int i = 0; i < Consumer.consumerThreadCount; i++){
Thread consumerThread = new Thread(consumer);
threads.add(consumerThread);
producer.addConsumer(consumer);
}

下面需要做的就是启动线程

1
2
3
4
5
6
7
8
threads.forEach(Thread :: start);
threads.forEach(thread -> {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});

thread.join()方法会一直等待,直到改线程结束。

以上就是starter需要做的。

1.2 Produce

本实验只设置了一个消费者模型,基本代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Producer implements Runnable {
private LinkedBlockingQueue<Optional<String>> queue;
private String inputFile;
public Producer(LinkedBlockingQueue<Optional<String>> queue, String inputFile) {
this.queue = queue;
this.inputFile = inputFile;
}
@Override
public void run() {
FileUtil.readFileLineByLine(inputFile, line -> {
queue.add(Optional.of(line));
});
for(int i = 0; i < Consumer.consumerThreadCount; i++) {
queue.add(Optional.empty());
}
}
}

producer类只是简单的重载了Runable的run方法,run方法一开始,将文件内容读入到queue队列中,这里面用到了Java8里面lanmda表达式。

line -> {queue.add(Optional.of(line)}

FileUtil是一个文件读取工具类,readFileLineByLine将文件按行读出到queue中,这里面同样用到了Java8中的Consumer类,关于这个类暂时按下不表,可以理解为接收一个lanmda表达式,并对每个accept的参数进行lanmda表达式的操作:

1
2
3
4
5
6
7
8
9
10
11
12
public static void readFileLineByLine(String filePath, Consumer<String> consumer) {
try {
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath), "Cp1252"));
String line;
while ((line = br.readLine()) != null) {
consumer.accept(line);
}
br.close();
} catch(IOException e) {
e.printStackTrace();
}
}

后面的for循环是将十个Optional空对象放入queue中,这里做的目的是实现对Consumer线程的结束控制,具体原理会在Consumer类中进行表述。Optional是Java8的特性,其官方解释是:

这是一个可以为null的容器对象。如果值存在则isPresent()方法会返回true,调用get()方法会返回该对象

较为通俗的说法是:

如果你开发过Java程序,可能会有过这样的经历:调用某种数据结构的一个方法得到了返回值却不能直接将返回值作为参数去调用别的方法。而是首先判断这个返回值是否为null,只有在非空的前提下才能将其作为其他方法的参数。Java8中新加了Optional这个类正是为了解决这个问题。

其具体用法以后解释,在本文中可简单理解为,在生产者将所有数据都写进队列后,我们放置10个空元素进入队列,消费者可根据空元素进行停止判断。

1.3 Consumer

Consumer的构建也是比较简单, 同producer一样,继承Runable,并重写Run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Consumer implements Runnable {
private AtomicInteger totalCount = new AtomicInteger(0);
public static final int consumerThreadCount = 10;
private FileUtil fileUtil;
private final static Logger logger = Logger.getLogger(Consumer.class);
private LinkedBlockingQueue<Optional<String>> queue;
public Consumer(LinkedBlockingQueue<Optional<String>> queue, String outputFile) {
this.queue = queue;
this.fileUtil = new FileUtil(outputFile);
}
@Override
public void run() {
try {
while (true) {
Optional<String> line = queue.take();
if (!line.isPresent()) break;
totalCount.incrementAndGet();
String processedLine = fileUtil.processLine(line.get());
fileUtil.appendToFile(processedLine);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
public FileUtil getFileUtil() {
return fileUtil;
}
public int getTotalCount() {
return totalCount.get();
}
}

主要看一下run方法,run方法使用while循环,循环读取queue中的数据。上文介绍过,queue.take()方法会一直阻塞直到队列中塞进数据。此外run方法中有使用fileUtil的processLine方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public String processLine(String line) throws IOException {
int min = 1;
int max = 100;
int randomMillisecconds = min + (int)(Math.random() * ((max - min) + 1));
try {
Thread.sleep(randomMillisecconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
// readLineCount is not accurate in multi-thread mode
readLineCount++;
logger.info(String.format("%d lines were read.", readLineCount));
return line;
}

processLine主要是对Consumer读取的行数记性计数,并在log中打印出来。为什么这样做,未在测试实验时说明。
关于Consumer还有一点需要关注,我们看到Consumer的run方法体中是一个while循环,那Consumer线程什么时候会停止就变成了一个问题。按照我们设计初衷,Producer把所有的文件按行读取到queue中,Consumer回去queue中读取数据,写回到另一个文件。按照这种逻辑,如果producer读完了所有文件,Consumer也将queue中的所有数据写回文件,此时Consumer就应该停止了。

1
if (!line.isPresent()) break;

这段代码就是负责停止Consumer线程的。记得我们在Producer,当所有数据都读取到queue中时,会在queue中塞入十个optional.empty变量,那如果在Consumer中queue.take()返回的是optinal.empty,就说明queue已经无数据了,当前Consumer就可以停止了。关于如何在循环中停止线程,还有很多方法,待后面有时间再做解析。以上就是Consumer类的构造。

2.实验

实验代码:

1
2
3
4
5
@Test
public void multiThreadTest() throws IOException {
starter.startMultiThreadTask(inputFile, outputFile);
Assert.assertTrue(isFileDataSame(inputFile, outputFile));
}

实验结果,实验结果会打印总耗时以及两个count变量:

1
2
3
4
5
6
7
2017-03-01 13:52:04 INFO  FileUtil:118 - 2954 lines were read.
2017-03-01 13:52:04 INFO FileUtil:118 - 2955 lines were read.
2017-03-01 13:52:04 INFO FileUtil:118 - 2956 lines were read.
2017-03-01 13:52:04 INFO FileUtil:118 - 2957 lines were read.
2017-03-01 13:52:04 INFO FileUtil:118 - 2958 lines were read.
2017-03-01 13:52:04 DEBUG Starter:50 - Consumer's totalCount: 3000
2017-03-01 13:52:04 INFO Starter:53 - It takes 16 seconds to finish

解释一下,诸如2958 lines were read,是从FileUtil工具类processLine函数中打印出的readLine变量,其代表意义是线程Consumer线程从queue中读取了多少行。Consumer’s totalCount: 3000 是直接打印的Consumer全局变totalCount,其代表意义同样是十个线程总共从queue中读取了多少行,按道理来说,这两个值是应该相同的,然后结果明显不一致。

从代码我们可以看到readLine是一个int型变量,而totalCount是一个AtomicInteger变量,很显然问题出在了这里。我们知道Java中++这种操作是线程不安全的,而readLineCount是个全局变量,所以如果多个线程同事在执行++操作时,就会产生totalCount的值不一致的问题,解决方法可以粗暴的在processLine中加上synchronized关键字:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public String processLine(String line) throws IOException {
synchronized (this) {
int min = 1;
int max = 100;
int randomMillisecconds = min + (int) (Math.random() * ((max - min) + 1));
try {
Thread.sleep(randomMillisecconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
// readLineCount is not accurate in multi-thread mode
readLineCount++;
logger.info(String.format("%d lines were read.", readLineCount));
return line;
}
}

我们再次运行测试脚本:

1
2
3
4
5
6
7
8
9
2017-03-01 14:09:34 INFO  FileUtil:119 - 2994 lines were read.
2017-03-01 14:09:34 INFO FileUtil:119 - 2995 lines were read.
2017-03-01 14:09:34 INFO FileUtil:119 - 2996 lines were read.
2017-03-01 14:09:34 INFO FileUtil:119 - 2997 lines were read.
2017-03-01 14:09:35 INFO FileUtil:119 - 2998 lines were read.
2017-03-01 14:09:35 INFO FileUtil:119 - 2999 lines were read.
2017-03-01 14:09:35 INFO FileUtil:119 - 3000 lines were read.
2017-03-01 14:09:35 DEBUG Starter:50 - Consumer's totalCount: 3000
2017-03-01 14:09:35 INFO Starter:53 - It takes 160 seconds to finish

可以看到两个变量的值相等了,说明这种方法可行,但是花费的时间确从16s到了160s,说明synchronized关键字极大的增加了时间的消耗,我们分析processLine方法,其实问题只出在readLineCount上,concurrent包中提供了AtomicInteger变量,它实现了对int变量的封装,实现了对自增操作的原子性。为此我们将readLineCount定义为:

1
private AtomicInteger readLineCount = new AtomicInteger(0);

processLine函数变为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public String processLine(String line) throws IOException {
int min = 1;
int max = 100;
int randomMillisecconds = min + (int) (Math.random() * ((max - min) + 1));
try {
Thread.sleep(randomMillisecconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
// readLineCount is not accurate in multi-thread mode
readLineCount.incrementAndGet();
logger.info(String.format("%d lines were read.", readLineCount.get()));
return line;
}

再次运行测试代码:

1
2
3
4
5
6
7
8
9
2017-03-01 14:18:23 INFO  FileUtil:120 - 2994 lines were read.
2017-03-01 14:18:23 INFO FileUtil:120 - 2995 lines were read.
2017-03-01 14:18:23 INFO FileUtil:120 - 2996 lines were read.
2017-03-01 14:18:23 INFO FileUtil:120 - 2997 lines were read.
2017-03-01 14:18:23 INFO FileUtil:120 - 2998 lines were read.
2017-03-01 14:18:23 INFO FileUtil:120 - 2999 lines were read.
2017-03-01 14:18:23 INFO FileUtil:120 - 3000 lines were read.
2017-03-01 14:18:23 DEBUG Starter:50 - Consumer's totalCount: 3000
2017-03-01 14:18:23 INFO Starter:53 - It takes 15 seconds to finish

可以看到,两个变量值相等了,然后耗时只用了15s。
从以上实验可以看到使用concurrent包中的变量和方法,比简单粗暴的使用synchronized这种方法在耗时方便具有很大的优势。

3.总结

以上我们初步试验了一个简单的生产者消费者模型,使用了Cocurrent包中的一些方法。时间比较急,先写这么多,后续有内容再行添加。

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×