zookeeper

简介

zookeeper为分布式系统调度框架, 用于解决分布式应用中数据管理问题,比如同步锁,分布式应用配置管理等

安装&配置

  • 下载
  • 修改配置
    在conf目录将zoo_sample.cfg拷贝为zoo.cfg, 该文件为zookeeper启动的默认配置文件
    zoo.cfg文件内容:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    tickTime=2000					#zookeeper服务器或客户端与服务器之间发送心跳的时间间隔(2s)
    initLimit=10 #zookeeper集群中连接到leader的fllower的时间不能超过多少个tickTime时间长度,若超过则连接失败
    syncLimit=5 #zookeeper集群中leader和fllower之间发送消息,请求和响应的时间不能超过多少个tickTime时间长度
    dataDir=/tmp/zookeeper #zookeeper数据存放位置
    clientPort=2181 #zookeeper客户端连接的端口号
    #maxClientCnxns=60 #最大连接数量
    #autopurge.snapRetainCount=3
    #autopurge.purgeInterval=1
    server.A=ipA:portA1:portA2 #表示集群中服务器地址, A:服务器标识, ipA:服务器ip地址, portA1:该服务器与leader通信的端口号, portA2: 当leader挂掉,用来选举使用的端口号
    server.B=ipB:portB1:portB2
在单机部署时至需要配置tickTime,dataDir,clientPort配置即可

在集群部署时需要再设置initLimit,syncLimit,server.*等配置, 同时需要在dataDir目录下新建myid文件, 内容为服务器的标识
  • 启动服务器
    cd ./bin && start zkServer.cmd

  • 启动客户端
    cd ./bin && start zkCli.cmd -server localhost:2181

  • 集群部署
    1.将zookeeper文件夹拷贝多份,修改zoo.cfg中的clientPort和dataDir
    2.修改dataDir目录下myid文件内容为该服务器的标识
    3.依次启动zookeeper服务器

实例

使用python实现分布式共享锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#!/usr/bin/env python
#encoding: utf-8

import logging
import threading

import time

from kazoo.client import KazooClient

_logger = logging.getLogger('zlock')

class ZKClient(KazooClient):

_instance = None

def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(ZKClient, cls).__new__(cls)
return cls._instance

def __init__(self, *args, **kwargs):
super(ZKClient, self).__init__(*args, **kwargs)
self.start()


class Zlock(object):

def __init__(self):
self._zkclient = ZKClient(hosts='localhost:2281')
self._lock_nameservice = '/uk/lock'
self._lock_prefix = 'lock_id_'
self._lock_id = ''
self._event = threading.Event()

def acquire(self, wait=True):
if self._lock_id == '':
self._lock_id = self._zkclient.create('%s/%s' % (self._lock_nameservice, self._lock_prefix), str(time.time()), ephemeral=True, sequence=True, makepath=True)

if not self.locked():
return True
elif wait:
self._event.wait()
else:
return False

def _watch(self, *args, **kwargs):
if not self.locked():
self._event.set()


def release(self):
self._zkclient.exists(self._lock_id) and self._zkclient.delete(self._lock_id)
self._lock_id = ''
self._event.clear()

def locked(self):
if self._zkclient.exists(self._lock_nameservice):
_children = self._zkclient.get_children(self._lock_nameservice, self._watch)
if self._lock_id != '':
_children.sort()
return '%s/%s' % (self._lock_nameservice, _children[0]) != self._lock_id
else:
return len(_children) != 0
else:
return False

if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(name)s %(levelname)s:%(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
_lock = Zlock()
for i in xrange(10):
print _lock.acquire(True)
_lock.release()
pass