大纲 前言 学习资源 版本说明 ZK 辅助源码分析 ZK 持久化源码 Leader 和 Follower 中的数据会在内存和磁盘中各保存一份,所以需要将内存中的数据持久化到磁盘中(如下图所示)。在 org.apache.zookeeper.server.persistence
包下的相关类都是持久化相关的代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public interface SnapShot { long deserialize (DataTree dt, Map<Long, Integer> sessions) throws IOException ; void serialize (DataTree dt, Map<Long, Integer> sessions, File name) throws IOException ; File findMostRecentSnapshot () throws IOException ; void close () throws IOException ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public interface TxnLog { void setServerStats (ServerStats serverStats) ; void rollLog () throws IOException ; boolean append (TxnHeader hdr, Record r) throws IOException ; TxnIterator read (long zxid) throws IOException ; long getLastLoggedZxid () throws IOException ; boolean truncate (long zxid) throws IOException ; long getDbId () throws IOException ; void commit () throws IOException ; long getTxnLogSyncElapsedTime () ; void close () throws IOException ; public interface TxnIterator { TxnHeader getHeader () ; Record getTxn () ; boolean next () throws IOException ; void close () throws IOException ; long getStorageSize () throws IOException ; } }
ZK 序列化源码 zookeeper-jute
模块是 ZooKeeper 序列化相关的源码。
1 2 3 4 5 6 7 8 9 public interface Record { public void serialize (OutputArchive archive, String tag) throws IOException ; public void deserialize (InputArchive archive, String tag) throws IOException ; }
1 2 3 4 5 6 7 8 9 public interface Index { public boolean done () ; public void incr () ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public interface OutputArchive { public void writeByte (byte b, String tag) throws IOException ; public void writeBool (boolean b, String tag) throws IOException ; public void writeInt (int i, String tag) throws IOException ; public void writeLong (long l, String tag) throws IOException ; public void writeFloat (float f, String tag) throws IOException ; public void writeDouble (double d, String tag) throws IOException ; public void writeString (String s, String tag) throws IOException ; public void writeBuffer (byte buf[], String tag) throws IOException ; public void writeRecord (Record r, String tag) throws IOException ; public void startRecord (Record r, String tag) throws IOException ; public void endRecord (Record r, String tag) throws IOException ; public void startVector (List<?> v, String tag) throws IOException ; public void endVector (List<?> v, String tag) throws IOException ; public void startMap (TreeMap<?,?> v, String tag) throws IOException ; public void endMap (TreeMap<?,?> v, String tag) throws IOException ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public interface InputArchive { public byte readByte (String tag) throws IOException ; public boolean readBool (String tag) throws IOException ; public int readInt (String tag) throws IOException ; public long readLong (String tag) throws IOException ; public float readFloat (String tag) throws IOException ; public double readDouble (String tag) throws IOException ; public String readString (String tag) throws IOException ; public byte [] readBuffer(String tag) throws IOException; public void readRecord (Record r, String tag) throws IOException ; public void startRecord (String tag) throws IOException ; public void endRecord (String tag) throws IOException ; public Index startVector (String tag) throws IOException ; public void endVector (String tag) throws IOException ; public Index startMap (String tag) throws IOException ; public void endMap (String tag) throws IOException ; }
ZK 服务端初始化源码分析
ZK 服务端启动脚本 ZooKeeper 服务端的启动命令是 zkServer.sh start
,其中 zkServer.sh
服务端脚本的核心源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 #!/usr/bin/env bash ZOOBIN="${BASH_SOURCE-$0} " ZOOBIN="$(dirname "${ZOOBIN} " ) " ZOOBINDIR="$(cd "${ZOOBIN} " ; pwd) " if [ -e "$ZOOBIN /../libexec/zkEnv.sh" ]; then . "$ZOOBINDIR " /../libexec/zkEnv.sh else . "$ZOOBINDIR " /zkEnv.sh fi if [ "x$JMXLOCALONLY " = "x" ]then JMXLOCALONLY=false fi if [ "x$JMXDISABLE " = "x" ] || [ "$JMXDISABLE " = 'false' ]then echo "ZooKeeper JMX enabled by default" >&2 if [ "x$JMXPORT " = "x" ] then ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain" else if [ "x$JMXAUTH " = "x" ] then JMXAUTH=false fi if [ "x$JMXSSL " = "x" ] then JMXSSL=false fi if [ "x$JMXLOG4J " = "x" ] then JMXLOG4J=true fi echo "ZooKeeper remote JMX Port set to $JMXPORT " >&2 echo "ZooKeeper remote JMX authenticate set to $JMXAUTH " >&2 echo "ZooKeeper remote JMX ssl set to $JMXSSL " >&2 echo "ZooKeeper remote JMX log4j set to $JMXLOG4J " >&2 ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=$JMXPORT -Dcom.sun.management.jmxremote.authenticate=$JMXAUTH -Dcom.sun.management.jmxremote.ssl=$JMXSSL -Dzookeeper.jmx.log4j.disable=$JMXLOG4J org.apache.zookeeper.server.quorum.QuorumPeerMain" fi else echo "JMX disabled by user request" >&2 ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain" fi if [ "x$SERVER_JVMFLAGS " != "x" ]then JVMFLAGS="$SERVER_JVMFLAGS $JVMFLAGS " fi ...... case $1 in start) echo -n "Starting zookeeper ... " if [ -f "$ZOOPIDFILE " ]; then if kill -0 `cat "$ZOOPIDFILE " ` > /dev/null 2>&1; then echo $command already running as process `cat "$ZOOPIDFILE " `. exit 1 fi fi nohup "$JAVA " $ZOO_DATADIR_AUTOCREATE "-Dzookeeper.log.dir=${ZOO_LOG_DIR} " \ "-Dzookeeper.log.file=${ZOO_LOG_FILE} " "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP} " \ -XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -9 %p' \ -cp "$CLASSPATH " $JVMFLAGS $ZOOMAIN "$ZOOCFG " > "$_ZOO_DAEMON_OUT " 2>&1 < /dev/null & ...... ;; stop) echo -n "Stopping zookeeper ... " if [ ! -f "$ZOOPIDFILE " ] then echo "no zookeeper to stop (could not find file $ZOOPIDFILE )" else $KILL $(cat "$ZOOPIDFILE " ) rm "$ZOOPIDFILE " sleep 1 echo STOPPED fi exit 0 ;; restart) shift "$0 " stop ${@} sleep 3 "$0 " start ${@} ;; status) ..... ;; *) echo "Usage: $0 [--config <conf-dir>] {start|start-foreground|stop|restart|status|print-cmd}" >&2 esac
由于 zkServer.sh start
命令在底层实际执行内容如下:
1 2 3 4 nohup "$JAVA " + 一堆提交参数 + $ZOOMAIN (org.apache.zookeeper.server.quorum.QuorumPeerMain) + "$ZOOCFG " (zkEnv.sh 文件中 ZOOCFG="zoo.cfg" )
所以 ZooKeeper 服务端的启动入口是 QuorumPeerMain
类。
ZK 服务端启动入口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package org.apache.zookeeper.server.quorum;public class QuorumPeerMain { public static void main (String[] args) { QuorumPeerMain main = new QuorumPeerMain(); try { main.initializeAndRun(args); } catch (IllegalArgumentException e) { ...... } LOG.info("Exiting normally" ); System.exit(0 ); } }
QuorumPeerMain.initializeAndRun()
方法1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 protected void initializeAndRun (String[] args) throws ConfigException, IOException, AdminServerException { QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1 ) { config.parse(args[0 ]); } DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config .getDataDir(), config.getDataLogDir(), config .getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); if (args.length == 1 && config.isDistributed()) { runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode" ); ZooKeeperServerMain.main(args); } }
解析配置参数 提示
ZooKeeper 在启动时,会解析 zoo.cfg
和 myid
配置文件(集群模式才需要有 myid
文件),从而获取核心配置参数。 zoo.cfg
配置文件,定义了整个 ZooKeeper 集群的配置参数,比如端口号、数据存储路径和服务器列表。myid
节点标识文件,存放了当前节点的唯一编号,用于区分集群中的各个节点。QuorumPeerConfig.parse()
方法1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public void parse (String path) throws ConfigException { LOG.info("Reading configuration from: " + path); try { File configFile = (new VerifyingFileFactory.Builder(LOG) .warnForRelativePath() .failForNonExistingPath() .build()).create(path); Properties cfg = new Properties(); FileInputStream in = new FileInputStream(configFile); try { cfg.load(in); configFileStr = path; } finally { in.close(); } parseProperties(cfg); } catch (IOException e) { throw new ConfigException("Error processing " + path, e); } catch (IllegalArgumentException e) { throw new ConfigException("Error processing " + path, e); } ...... }
QuorumPeerConfig.parseProperties()
方法1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public void parseProperties (Properties zkProp) throws IOException, ConfigException { int clientPort = 0 ; int secureClientPort = 0 ; String clientPortAddress = null ; String secureClientPortAddress = null ; VerifyingFileFactory vff = new VerifyingFileFactory.Builder(LOG).warnForRelativePath().build(); for (Entry<Object, Object> entry : zkProp.entrySet()) { String key = entry.getKey().toString().trim(); String value = entry.getValue().toString().trim(); if (key.equals("dataDir" )) { dataDir = vff.create(value); } else if (key.equals("dataLogDir" )) { dataLogDir = vff.create(value); } else if (key.equals("clientPort" )) { clientPort = Integer.parseInt(value); } else if (key.equals("localSessionsEnabled" )) { localSessionsEnabled = Boolean.parseBoolean(value); } else if (key.equals("localSessionsUpgradingEnabled" )) { localSessionsUpgradingEnabled = Boolean.parseBoolean(value); } else if (key.equals("clientPortAddress" )) { clientPortAddress = value.trim(); } else if (key.equals("secureClientPort" )) { secureClientPort = Integer.parseInt(value); } else if (key.equals("secureClientPortAddress" )){ secureClientPortAddress = value.trim(); } else if (key.equals("tickTime" )) { tickTime = Integer.parseInt(value); } else if (key.equals("maxClientCnxns" )) { maxClientCnxns = Integer.parseInt(value); } else if (key.equals("minSessionTimeout" )) { minSessionTimeout = Integer.parseInt(value); } else if (key.equals("maxSessionTimeout" )) { maxSessionTimeout = Integer.parseInt(value); } else if (key.equals("initLimit" )) { initLimit = Integer.parseInt(value); } else if (key.equals("syncLimit" )) { syncLimit = Integer.parseInt(value); } ...... } ...... if (dynamicConfigFileStr == null ) { setupQuorumPeerConfig(zkProp, true ); if (isDistributed() && isReconfigEnabled()) { backupOldConfig(); } } }
QuorumPeerConfig.setupQuorumPeerConfig()
方法1 2 3 4 5 6 7 8 void setupQuorumPeerConfig (Properties prop, boolean configBackwardCompatibilityMode) throws IOException, ConfigException { quorumVerifier = parseDynamicConfig(prop, electionAlg, true , configBackwardCompatibilityMode); setupMyId(); setupClientPort(); setupPeerType(); checkValidity(); }
QuorumPeerConfig.setupMyId()
方法1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private void setupMyId () throws IOException { File myIdFile = new File(dataDir, "myid" ); if (!myIdFile.isFile()) { return ; } BufferedReader br = new BufferedReader(new FileReader(myIdFile)); String myIdString; try { myIdString = br.readLine(); } finally { br.close(); } try { serverId = Long.parseLong(myIdString); MDC.put("myid" , myIdString); } catch (NumberFormatException e) { throw new IllegalArgumentException("serverid " + myIdString + " is not a number" ); } }
过期快照删除 QuorumPeerMain.initializeAndRun()
方法1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 protected void initializeAndRun (String[] args) throws ConfigException, IOException, AdminServerException { QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1 ) { config.parse(args[0 ]); } DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config .getDataDir(), config.getDataLogDir(), config .getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); if (args.length == 1 && config.isDistributed()) { runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode" ); ZooKeeperServerMain.main(args); } }
DatadirCleanupManager.start()
方法1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public void start () { if (PurgeTaskStatus.STARTED == purgeTaskStatus) { LOG.warn("Purge task is already running." ); return ; } if (purgeInterval <= 0 ) { LOG.info("Purge task is not scheduled." ); return ; } timer = new Timer("PurgeTask" , true ); TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount); timer.scheduleAtFixedRate(task, 0 , TimeUnit.HOURS.toMillis(purgeInterval)); purgeTaskStatus = PurgeTaskStatus.STARTED; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 static class PurgeTask extends TimerTask { private File logsDir; private File snapsDir; private int snapRetainCount; public PurgeTask (File dataDir, File snapDir, int count) { logsDir = dataDir; snapsDir = snapDir; snapRetainCount = count; } @Override public void run () { LOG.info("Purge task started." ); try { PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount); } catch (Exception e) { LOG.error("Error occurred while purging." , e); } LOG.info("Purge task completed." ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static void purge (File dataDir, File snapDir, int num) throws IOException { if (num < 3 ) { throw new IllegalArgumentException(COUNT_ERR_MSG); } FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir); List<File> snaps = txnLog.findNRecentSnapshots(num); int numSnaps = snaps.size(); if (numSnaps > 0 ) { purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1 )); } }
初始化通信组件 I/O 模型默认为 NIO 通信 QuorumPeerMain.initializeAndRun()
方法1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 protected void initializeAndRun (String[] args) throws ConfigException, IOException, AdminServerException { QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1 ) { config.parse(args[0 ]); } DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config .getDataDir(), config.getDataLogDir(), config .getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); if (args.length == 1 && config.isDistributed()) { runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode" ); ZooKeeperServerMain.main(args); } }
QuorumPeerMain.runFromConfig()
方法1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 public void runFromConfig (QuorumPeerConfig config) throws IOException, AdminServerException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control" , e); } LOG.info("Starting quorum peer" ); try { ServerCnxnFactory cnxnFactory = null ; ServerCnxnFactory secureCnxnFactory = null ; if (config.getClientPortAddress() != null ) { cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false ); } if (config.getSecureClientPortAddress() != null ) { secureCnxnFactory = ServerCnxnFactory.createFactory(); secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), true ); } quorumPeer = getQuorumPeer(); quorumPeer.setTxnFactory(new FileTxnSnapLog( config.getDataLogDir(), config.getDataDir())); quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled()); quorumPeer.enableLocalSessionsUpgrading( config.isLocalSessionsUpgradingEnabled()); quorumPeer.setElectionType(config.getElectionAlg()); quorumPeer.setMyid(config.getServerId()); quorumPeer.setTickTime(config.getTickTime()); quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); quorumPeer.setInitLimit(config.getInitLimit()); quorumPeer.setSyncLimit(config.getSyncLimit()); quorumPeer.setConfigFileName(config.getConfigFilename()); quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false ); if (config.getLastSeenQuorumVerifier()!=null ) { quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false ); } quorumPeer.initConfigInZKDatabase(); quorumPeer.setCnxnFactory(cnxnFactory); quorumPeer.setSecureCnxnFactory(secureCnxnFactory); quorumPeer.setSslQuorum(config.isSslQuorum()); quorumPeer.setUsePortUnification(config.shouldUsePortUnification()); quorumPeer.setLearnerType(config.getPeerType()); quorumPeer.setSyncEnabled(config.getSyncEnabled()); quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); if (config.sslQuorumReloadCertFiles) { quorumPeer.getX509Util().enableCertFileReloading(); } quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl); if (quorumPeer.isQuorumSaslAuthEnabled()){ quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl); quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl); quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal); quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext); quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext); } quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize); quorumPeer.initialize(); quorumPeer.start(); quorumPeer.join(); } catch (InterruptedException e) { LOG.warn("Quorum Peer interrupted" , e); } }
ServerCnxnFactory.createFactory()
方法1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory" ;static public ServerCnxnFactory createFactory () throws IOException { String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY); if (serverCnxnFactoryName == null ) { serverCnxnFactoryName = NIOServerCnxnFactory.class.getName(); } try { ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName) .getDeclaredConstructor().newInstance(); LOG.info("Using {} as server connection factory" , serverCnxnFactoryName); return serverCnxnFactory; } catch (Exception e) { IOException ioe = new IOException("Couldn't instantiate " + serverCnxnFactoryName); ioe.initCause(e); throw ioe; } }
在 zookeeper-docs
模块的 zookeeperAdmin.md
文件中,有以下这么一段说明:
1 2 3 4 5 serverCnxnFactory : (Java system property: zookeeper.serverCnxnFactory) Specifies ServerCnxnFactory implementation. This should be set to `NettyServerCnxnFactory` in order to use TLS based server communication. Default is `NIOServerCnxnFactory`.
初始化 NIO 服务端的 Socket 提示
这里初始化 NIO 服务端的 Socket 时,ZooKeeper 并未启动。
NIOServerCnxnFactory.configure()
方法1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 @Override public void configure (InetSocketAddress addr, int maxcc, boolean secure) throws IOException { if (secure) { throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn" ); } configureSaslLogin(); maxClientCnxns = maxcc; sessionlessCnxnTimeout = Integer.getInteger( ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000 ); cnxnExpiryQueue = new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout); expirerThread = new ConnectionExpirerThread(); int numCores = Runtime.getRuntime().availableProcessors(); numSelectorThreads = Integer.getInteger( ZOOKEEPER_NIO_NUM_SELECTOR_THREADS, Math.max((int ) Math.sqrt((float ) numCores/2 ), 1 )); if (numSelectorThreads < 1 ) { throw new IOException("numSelectorThreads must be at least 1" ); } numWorkerThreads = Integer.getInteger( ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores); workerShutdownTimeoutMS = Long.getLong( ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000 ); LOG.info("Configuring NIO connection handler with " + (sessionlessCnxnTimeout/1000 ) + "s sessionless connection" + " timeout, " + numSelectorThreads + " selector thread(s), " + (numWorkerThreads > 0 ? numWorkerThreads : "no" ) + " worker threads, and " + (directBufferBytes == 0 ? "gathered writes." : ("" + (directBufferBytes/1024 ) + " kB direct buffers." ))); for (int i=0 ; i<numSelectorThreads; ++i) { selectorThreads.add(new SelectorThread(i)); } this .ss = ServerSocketChannel.open(); ss.socket().setReuseAddress(true ); LOG.info("binding to port " + addr); ss.socket().bind(addr); ss.configureBlocking(false ); acceptThread = new AcceptThread(ss, addr, selectorThreads); }
ZK 服务端加载数据源码分析 冷启动数据恢复快照数据