Memcached Enyim.Caching 解决无法遍历获取Key的问题

亦凉 2022-09-02 01:16 74阅读 0赞

STATS命令

出于性能考虑,memcached没有提供遍历功能,不过我们可以通过以下两个stats命令得到所有的缓存对象。

1、stats items

显示各个slab中item的数目。

2、stats cachedump slab_id limit_num
显示某个slab中的前limit_num个key列表,显示格式:ITEM key_name [ value_length b; expire_time|access_time s]

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2tlc3NoZWk_size_16_color_FFFFFF_t_70

20210729161717908.png

基于此 我们就可以通过 扩展类的方式来实现其业务逻辑

  1. public class MemcachedClient : Enyim.Caching.MemcachedClient
  2. {
  3. public MemcachedClient(string sectionName) : base(sectionName)
  4. {
  5. }
  6. public IDictionary<IPEndPoint, List<string>> GetKeys()
  7. {
  8. var data = new Dictionary<IPEndPoint, List<string>>();
  9. var result = StatsItems("items");
  10. var itemsData = new Dictionary<IPEndPoint, Dictionary<string, string>>();
  11. foreach (var item in result)
  12. {
  13. var ip = item.Key;
  14. var items = new Dictionary<string, string>();
  15. foreach (var info in item.Value)
  16. {
  17. var key = info.Key;
  18. var strArry = key.Split(new string[] { ":" }, StringSplitOptions.RemoveEmptyEntries);
  19. if (strArry.Length == 3 && strArry[2] == "number")
  20. {
  21. key = strArry[1].Trim();
  22. var value = info.Value;
  23. items.Add(key, value);
  24. }
  25. }
  26. if (items.Any())
  27. {
  28. itemsData.Add(ip, items);
  29. }
  30. }
  31. if (itemsData.Any())
  32. {
  33. foreach (var item in itemsData)
  34. {
  35. var ip = item.Key;
  36. var list = new List<string>();
  37. foreach (var info in item.Value)
  38. {
  39. var dump = StatsItems($"cachedump {info.Key} {info.Value}");
  40. if (dump.TryGetValue(ip, out Dictionary<string, string> dics))
  41. {
  42. foreach (var dic in dics)
  43. {
  44. list.Add(dic.Key);
  45. }
  46. }
  47. }
  48. if (list.Any())
  49. {
  50. data.Add(ip, list);
  51. }
  52. }
  53. }
  54. return data;
  55. }
  56. private Dictionary<IPEndPoint, Dictionary<string, string>> StatsItems(string type)
  57. {
  58. var results = new Dictionary<IPEndPoint, Dictionary<string, string>>();
  59. foreach (var node in this.Pool.GetWorkingNodes())
  60. {
  61. IStatsOperation cmd = new StatsItemsOperation(type);
  62. node.Execute(cmd);
  63. results[node.EndPoint] = cmd.Result;
  64. }
  65. return results;
  66. }
  67. }
  68. public class StatsItemsOperation : Operation, IStatsOperation
  69. {
  70. private static Enyim.Caching.ILog log = Enyim.Caching.LogManager.GetLogger(typeof(StatsOperation));
  71. private string type;
  72. private Dictionary<string, string> result;
  73. public StatsItemsOperation(string type)
  74. {
  75. this.type = type;
  76. }
  77. protected override IList<ArraySegment<byte>> GetBuffer()
  78. {
  79. var command = String.IsNullOrEmpty(this.type)
  80. ? "stats" + TextSocketHelper.CommandTerminator
  81. : "stats " + this.type + TextSocketHelper.CommandTerminator;
  82. return TextSocketHelper.GetCommandBuffer(command);
  83. }
  84. protected override IOperationResult ReadResponse(PooledSocket socket)
  85. {
  86. var serverData = new Dictionary<string, string>();
  87. while (true)
  88. {
  89. string line = TextSocketHelper.ReadResponse(socket);
  90. // stat values are terminated by END
  91. if (String.Compare(line, "END", StringComparison.Ordinal) == 0)
  92. break;
  93. expected response is STAT item_name item_value
  94. //if (line.Length < 6 || String.Compare(line, 0, "STAT ", 0, 5, StringComparison.Ordinal) != 0)
  95. //{
  96. // if (log.IsWarnEnabled)
  97. // log.Warn("Unknow response: " + line);
  98. // continue;
  99. //}
  100. // get the key&value
  101. string[] parts = line.Remove(0, 5).Split(' ');
  102. //if (parts.Length != 2)
  103. //{
  104. // if (log.IsWarnEnabled)
  105. // log.Warn("Unknow response: " + line);
  106. // continue;
  107. //}
  108. // store the stat item
  109. serverData[parts[0]] = parts[1];
  110. }
  111. this.result = serverData;
  112. return new TextOperationResult().Pass();
  113. }
  114. Dictionary<string, string> IStatsOperation.Result
  115. {
  116. get { return result; }
  117. }
  118. protected override bool ReadResponseAsync(PooledSocket socket, System.Action<bool> next)
  119. {
  120. throw new System.NotSupportedException();
  121. }
  122. }
  123. internal static class TextSocketHelper
  124. {
  125. private const string GenericErrorResponse = "ERROR";
  126. private const string ClientErrorResponse = "CLIENT_ERROR ";
  127. private const string ServerErrorResponse = "SERVER_ERROR ";
  128. private const int ErrorResponseLength = 13;
  129. public const string CommandTerminator = "\r\n";
  130. private static readonly Enyim.Caching.ILog log = Enyim.Caching.LogManager.GetLogger(typeof(TextSocketHelper));
  131. /// <summary>
  132. /// Reads the response of the server.
  133. /// </summary>
  134. /// <returns>The data sent by the memcached server.</returns>
  135. /// <exception cref="T:System.InvalidOperationException">The server did not sent a response or an empty line was returned.</exception>
  136. /// <exception cref="T:Enyim.Caching.Memcached.MemcachedException">The server did not specified any reason just returned the string ERROR. - or - The server returned a SERVER_ERROR, in this case the Message of the exception is the message returned by the server.</exception>
  137. /// <exception cref="T:Enyim.Caching.Memcached.MemcachedClientException">The server did not recognize the request sent by the client. The Message of the exception is the message returned by the server.</exception>
  138. public static string ReadResponse(PooledSocket socket)
  139. {
  140. string response = TextSocketHelper.ReadLine(socket);
  141. if (log.IsDebugEnabled)
  142. log.Debug("Received response: " + response);
  143. if (String.IsNullOrEmpty(response))
  144. throw new MemcachedClientException("Empty response received.");
  145. if (String.Compare(response, GenericErrorResponse, StringComparison.Ordinal) == 0)
  146. throw new NotSupportedException("Operation is not supported by the server or the request was malformed. If the latter please report the bug to the developers.");
  147. if (response.Length >= ErrorResponseLength)
  148. {
  149. if (String.Compare(response, 0, ClientErrorResponse, 0, ErrorResponseLength, StringComparison.Ordinal) == 0)
  150. {
  151. throw new MemcachedClientException(response.Remove(0, ErrorResponseLength));
  152. }
  153. else if (String.Compare(response, 0, ServerErrorResponse, 0, ErrorResponseLength, StringComparison.Ordinal) == 0)
  154. {
  155. throw new MemcachedException(response.Remove(0, ErrorResponseLength));
  156. }
  157. }
  158. return response;
  159. }
  160. /// <summary>
  161. /// Reads a line from the socket. A line is terninated by \r\n.
  162. /// </summary>
  163. /// <returns></returns>
  164. private static string ReadLine(PooledSocket socket)
  165. {
  166. MemoryStream ms = new MemoryStream(50);
  167. bool gotR = false;
  168. //byte[] buffer = new byte[1];
  169. int data;
  170. while (true)
  171. {
  172. data = socket.ReadByte();
  173. if (data == 13)
  174. {
  175. gotR = true;
  176. continue;
  177. }
  178. if (gotR)
  179. {
  180. if (data == 10)
  181. break;
  182. ms.WriteByte(13);
  183. gotR = false;
  184. }
  185. ms.WriteByte((byte)data);
  186. }
  187. string retval = Encoding.ASCII.GetString(ms.GetBuffer(), 0, (int)ms.Length);
  188. if (log.IsDebugEnabled)
  189. log.Debug("ReadLine: " + retval);
  190. return retval;
  191. }
  192. /// <summary>
  193. /// Gets the bytes representing the specified command. returned buffer can be used to streamline multiple writes into one Write on the Socket
  194. /// using the <see cref="M:Enyim.Caching.Memcached.PooledSocket.Write(IList<ArraySegment<byte>>)"/>
  195. /// </summary>
  196. /// <param name="value">The command to be converted.</param>
  197. /// <returns>The buffer containing the bytes representing the command. The command must be terminated by \r\n.</returns>
  198. /// <remarks>The Nagle algorithm is disabled on the socket to speed things up, so it's recommended to convert a command into a buffer
  199. /// and use the <see cref="M:Enyim.Caching.Memcached.PooledSocket.Write(IList<ArraySegment<byte>>)"/> to send the command and the additional buffers in one transaction.</remarks>
  200. public static IList<ArraySegment<byte>> GetCommandBuffer(string value)
  201. {
  202. var data = new ArraySegment<byte>(Encoding.ASCII.GetBytes(value));
  203. return new ArraySegment<byte>[] { data };
  204. }
  205. }

使用方式;

20210729161937737.png

发表评论

表情:
评论列表 (有 0 条评论,74人围观)

还没有评论,来说两句吧...

相关阅读