ZooKeeper Distributed lock

雨点打透心脏的1/2处 2023-08-17 17:46 197阅读 0赞
  • https://segmentfault.com/a/1190000016351095
  • http://www.dengshenyu.com/java/%E5%88%86%E5%B8%83%E5%BC%8F%E7%B3%BB%E7%BB%9F/2017/10/23/zookeeper-distributed-lock.html

Test

  1. Enumerable.Range(1, 5).ToList().ForEach(i =>
  2. {
  3. Task.Run(() =>
  4. {
  5. var lockHelper = new ZooKeeperLockHelper("localhost:5181");
  6. lockHelper.OnAcquireLock += (id) =>
  7. {
  8. var random = new Random().Next(10);
  9. Log.Debug("NodeId {@id} executing.....Sleep {@ms} ms", id, random * 1000);
  10. Thread.Sleep(random * 1000);
  11. Log.Debug("NodeId {@id} executing success", id);
  12. return Task.CompletedTask;
  13. };
  14. lockHelper.AcquireLock();
  15. });
  16. });
  17. using org.apache.zookeeper;
  18. using Serilog;
  19. using System;
  20. using System.Collections.Generic;
  21. using System.Linq;
  22. using System.Text;
  23. using System.Text.RegularExpressions;
  24. using System.Threading;
  25. using System.Threading.Tasks;
  26. namespace RedisDemo
  27. {
  28. public class ZooKeeperLockHelper : Watcher, IDisposable
  29. {
  30. #region event
  31. public event Func<long, Task> OnAcquireLock;
  32. #endregion
  33. private bool _disposed;
  34. private ZooKeeper _zooKeeper;
  35. private Event.KeeperState _currentState;
  36. private AutoResetEvent _notifyEvent = new AutoResetEvent(false);
  37. private string _connectionString;
  38. private bool _hasAcquireLock;
  39. private string _lockPath;
  40. private long _currentNodeId;
  41. private static readonly string DEFAULT_PATH = "/zk";
  42. private static readonly string NODE_NAME = "node-";
  43. public ZooKeeperLockHelper(string connectionString)
  44. {
  45. _connectionString = connectionString;
  46. this.Initialize(_connectionString, TimeSpan.FromSeconds(60));
  47. }
  48. public void AcquireLock(string path = "")
  49. {
  50. if (this._hasAcquireLock)
  51. {
  52. FireAcquireLock(this._currentNodeId).Wait();
  53. return;
  54. }
  55. if (!WaitConnected(TimeSpan.FromSeconds(10)))
  56. {
  57. throw new Exception($"{_connectionString} Cannot Connect ZooKeeper");
  58. }
  59. _lockPath = path;
  60. if (string.IsNullOrEmpty(_lockPath))
  61. {
  62. _lockPath = DEFAULT_PATH;
  63. }
  64. var nodePath = _lockPath + "/" + NODE_NAME;
  65. var spath = this._zooKeeper.createAsync(
  66. nodePath, Encoding.UTF8.GetBytes("data"),
  67. ZooDefs.Ids.OPEN_ACL_UNSAFE,
  68. CreateMode.EPHEMERAL_SEQUENTIAL).Result;
  69. this._currentNodeId = ParseNodeId(spath);
  70. var reuslt = this._zooKeeper.getChildrenAsync(_lockPath, true).GetAwaiter().GetResult();
  71. Log.Debug("#-> Begin Acquire Lock CurrentId {@id}", _currentNodeId);
  72. if (this.IsMinNodeId(reuslt, this._currentNodeId))
  73. {
  74. lock (this)
  75. {
  76. if (!this._hasAcquireLock)
  77. {
  78. Log.Debug("NodeId {@id} Direct Acquire Lock", _currentNodeId);
  79. this._hasAcquireLock = true;
  80. this.FireAcquireLock(this._currentNodeId).Wait();
  81. }
  82. }
  83. }
  84. }
  85. protected bool IsMinNodeId(ChildrenResult childrenResult, long nodeId)
  86. {
  87. if (nodeId == 0 || childrenResult == null || childrenResult.Children.Count == 0)
  88. return false;
  89. var nodeIds = new List<long>();
  90. foreach (var item in childrenResult.Children)
  91. {
  92. nodeIds.Add(ParseNodeId(item));
  93. }
  94. if (nodeIds.Count > 0 && nodeIds.Min() == nodeId)
  95. {
  96. return true;
  97. }
  98. return false;
  99. }
  100. protected long ParseNodeId(string path)
  101. {
  102. var m = Regex.Match(path, "(\\d+)");
  103. if (m.Success)
  104. {
  105. return long.Parse(m.Groups[0].Value);
  106. }
  107. return 0L;
  108. }
  109. protected void Initialize(String connectionString, TimeSpan sessionTimeout)
  110. {
  111. this._zooKeeper = new ZooKeeper(connectionString, (int)sessionTimeout.TotalMilliseconds, this);
  112. }
  113. public Task FireAcquireLock(long id)
  114. {
  115. this.OnAcquireLock(id).Wait();
  116. this.CloseConnection();
  117. Log.Debug("NodeId {@id} Close ZooKeeper Success", id);
  118. return Task.CompletedTask;
  119. }
  120. public bool WaitConnected(TimeSpan timeout)
  121. {
  122. var continueWait = false;
  123. while (this._currentState != Event.KeeperState.SyncConnected)
  124. {
  125. continueWait = _notifyEvent.WaitOne(timeout);
  126. if (!continueWait)
  127. {
  128. return false;
  129. }
  130. }
  131. return true;
  132. }
  133. protected void CloseConnection()
  134. {
  135. if (_disposed)
  136. {
  137. return;
  138. }
  139. _disposed = true;
  140. if (_zooKeeper != null)
  141. {
  142. try
  143. {
  144. this._zooKeeper.closeAsync().ConfigureAwait(false).GetAwaiter().GetResult();
  145. }
  146. catch { }
  147. }
  148. }
  149. #region Watcher Impl
  150. public override Task process(WatchedEvent @event)
  151. {
  152. if (@event.getState() == Event.KeeperState.SyncConnected)
  153. {
  154. if (String.IsNullOrEmpty(@event.getPath()))
  155. {
  156. this._currentState = @event.getState();
  157. this._notifyEvent.Set();
  158. }
  159. var path = @event.getPath();
  160. if (!string.IsNullOrEmpty(path))
  161. {
  162. if (path.Equals(this._lockPath))
  163. {
  164. Log.Debug("NodeId {@id} Start Watcher Callback", this._currentNodeId);
  165. if (this._hasAcquireLock)
  166. {
  167. Log.Debug("NodeId {@id} Has Acquire Lock return", this._currentNodeId);
  168. return Task.CompletedTask;
  169. }
  170. Task.Run(() =>
  171. {
  172. var childrenResult = _zooKeeper.getChildrenAsync(this._lockPath, this).Result;
  173. if (IsMinNodeId(childrenResult, this._currentNodeId))
  174. {
  175. lock (this)
  176. {
  177. if (!this._hasAcquireLock)
  178. {
  179. Log.Debug("NodeId {@id} Acquire Lock", this._currentNodeId);
  180. this._hasAcquireLock = true;
  181. this.FireAcquireLock(this._currentNodeId).Wait();
  182. }
  183. }
  184. }
  185. });
  186. //_zooKeeper.getChildrenAsync(_lockPath, this);
  187. }
  188. }
  189. }
  190. return Task.CompletedTask;
  191. }
  192. public void Dispose()
  193. {
  194. this.CloseConnection();
  195. }
  196. #endregion
  197. }
  198. }

转载于:https://www.cnblogs.com/byxxw/p/11352564.html

发表评论

表情:
评论列表 (有 0 条评论,197人围观)

还没有评论,来说两句吧...

相关阅读