绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
C#实现请求性校验支持高并发
2019-12-13 15:36:46

来源:JAVA柯尼塞克丶



使用场景描述:

网络请求中经常会遇到发送的请求,服务端响应是成功的,但是返回的时候出现网络故障,导致客户端无法接收到请求结果,那么客户端程序可能判断为网络故障,而重复发送同一个请求。当然如果接口中定义了请求结果查询接口,那么这种重复会相对少一些。特别是交易类的数据,这种操作更是需要避免重复发送请求。另外一种情况是用户过于快速的点击界面按钮,产生连续的相同内容请求,那么后端也需要进行过滤,这种一般出现在系统对接上,无法去控制第三方系统的业务逻辑,需要从自身业务逻辑里面去限定。

其他需求描述:

这类请求一般存在时间范围和高并发的特点,就是短时间内会出现重复的请求,因此对模块需要支持高并发性。

技术实现:

对请求的业务内容进行MD5摘要,并且将MD5摘要存储到缓存中,每个请求数据都通过这个一个公共的调用的方法进行判断。

代码实现:

公共调用代码 UniqueCheck 采用单例模式创建对象,便于在多线程调用的时候,只访问一个统一的缓存库

/*
 * volatile就像大家更熟悉的const一样,volatile是一个类型修饰符(type specifier)。
 * 它是被设计用来修饰被不同线程访问和修改的变量。
 * 如果没有volatile,基本上会导致这样的结果:要么无法编写多线程程序,要么编译器失去大量优化的机会。
 */
 private static readonly object lockHelper = new object();
 
 private volatile static UniqueCheck _instance; 
 
 /// <summary>
 /// 获取单一实例
 /// </summary>
 /// <returns></returns>
 public static UniqueCheck GetInstance()
 {
 if (_instance == null)
 {
 lock (lockHelper)
 {
 if (_instance == null)
 _instance = new UniqueCheck();
 }
 }
 return _instance;
 }


这里需要注意volatile的修饰符,在实际测试过程中,如果没有此修饰符,在高并发的情况下会出现报错。

自定义一个可以进行并发处理队列,代码如下:ConcurrentLinkedQueue

1 using System;
 2 using System.Collections.Generic;
 3 using System.Text;
 4 using System.Threading;
 5 
 6 namespace PackgeUniqueCheck
 7 {
 8 /// <summary>
 9 /// 非加锁并发队列,处理100个并发数以内
 10 /// </summary>
 11 /// <typeparam name="T"></typeparam>
 12 public class ConcurrentLinkedQueue<T>
 13 {
 14 private class Node<K>
 15 {
 16 internal K Item;
 17 internal Node<K> Next;
 18 
 19 public Node(K item, Node<K> next)
 20 {
 21 this.Item = item;
 22 this.Next = next;
 23 }
 24 }
 25 
 26 private Node<T> _head;
 27 private Node<T> _tail;
 28 
 29 public ConcurrentLinkedQueue()
 30 {
 31 _head = new Node<T>(default(T), null);
 32 _tail = _head;
 33 }
 34 
 35 public bool IsEmpty
 36 {
 37 get { return (_head.Next == null); }
 38 }
 39 /// <summary>
 40 /// 进入队列
 41 /// </summary>
 42 /// <param name="item"></param>
 43 public void Enqueue(T item)
 44 {
 45 Node<T> newNode = new Node<T>(item, null);
 46 while (true)
 47 {
 48 Node<T> curTail = _tail;
 49 Node<T> residue = curTail.Next;
 50 
 51 //判断_tail是否被其他process改变
 52 if (curTail == _tail)
 53 {
 54 //A 有其他process执行C成功,_tail应该指向新的节点
 55 if (residue == null)
 56 {
 57 //C 其他process改变了tail节点,需要重新取tail节点
 58 if (Interlocked.CompareExchange<Node<T>>(
 59 ref curTail.Next, newNode, residue) == residue)
 60 {
 61 //D 尝试修改tail
 62 Interlocked.CompareExchange<Node<T>>(ref _tail, newNode, curTail);
 63 return;
 64 }
 65 }
 66 else
 67 {
 68 //B 帮助其他线程完成D操作
 69 Interlocked.CompareExchange<Node<T>>(ref _tail, residue, curTail);
 70 }
 71 }
 72 }
 73 }
 74 /// <summary>
 75 /// 队列取数据
 76 /// </summary>
 77 /// <param name="result"></param>
 78 /// <returns></returns>
 79 public bool TryDequeue(out T result)
 80 {
 81 Node<T> curHead;
 82 Node<T> curTail;
 83 Node<T> next;
 84 while (true)
 85 {
 86 curHead = _head;
 87 curTail = _tail;
 88 next = curHead.Next;
 89 if (curHead == _head)
 90 {
 91 if (next == null) //Queue为空
 92 {
 93 result = default(T);
 94 return false;
 95 }
 96 if (curHead == curTail) //Queue处于Enqueue个node的过程中
 97 {
 98 //尝试帮助其他Process完成操作
 99 Interlocked.CompareExchange<Node<T>>(ref _tail, next, curTail);
100 }
101 else
102 {
103 //取next.Item必须放到CAS之前
104 result = next.Item;
105 //如果_head没有发生改变,则将_head指向next并退出
106 if (Interlocked.CompareExchange<Node<T>>(ref _head,
107 next, curHead) == curHead)
108 break;
109 }
110 }
111 }
112 return true;
113 }
114 /// <summary>
115 /// 尝试获取后一个对象
116 /// </summary>
117 /// <param name="result"></param>
118 /// <returns></returns>
119 public bool TryGetTail(out T result)
120 {
121 result = default(T);
122 if (_tail == null)
123 {
124 return false;
125 }
126 result = _tail.Item;
127 return true;
128 }
129 }
130 }


虽然是一个非常简单的性校验逻辑,但是要做到高效率,高并发支持,高可靠性,以及低内存占用,需要实现这样的需求,需要做细致的模拟测试。

1 using System;
 2 using System.Collections.Generic;
 3 using System.Text;
 4 using System.Threading;
 5 using System.Collections;
 6 
 7 namespace PackgeUniqueCheck
 8 {
 9 public class UniqueCheck
 10 {
 11 /*
 12 * volatile就像大家更熟悉的const一样,volatile是一个类型修饰符(type specifier)。
 13 * 它是被设计用来修饰被不同线程访问和修改的变量。
 14 * 如果没有volatile,基本上会导致这样的结果:要么无法编写多线程程序,要么编译器失去大量优化的机会。
 15 */
 16 private static readonly object lockHelper = new object();
 17 
 18 private volatile static UniqueCheck _instance; 
 19 
 20 /// <summary>
 21 /// 获取单一实例
 22 /// </summary>
 23 /// <returns></returns>
 24 public static UniqueCheck GetInstance()
 25 {
 26 if (_instance == null)
 27 {
 28 lock (lockHelper)
 29 {
 30 if (_instance == null)
 31 _instance = new UniqueCheck();
 32 }
 33 }
 34 return _instance;
 35 }
 36 
 37 private UniqueCheck()
 38 {
 39 //创建一个线程安全的哈希表,作为字典缓存
 40 _DataKey = Hashtable.Synchronized(new Hashtable());
 41 Queue myqueue = new Queue();
 42 _DataQueue = Queue.Synchronized(myqueue);
 43 _Myqueue = new ConcurrentLinkedQueue<string>();
 44 _Timer = new Thread(DoTicket);
 45 _Timer.Start();
 46 }
 47 
 48 #region 公共属性设置
 49 /// <summary>
 50 /// 设定定时线程的休眠时间长度:默认为1分钟
 51 /// 时间范围:1-7200000,值为1毫秒到2小时
 52 /// </summary>
 53 /// <param name="value"></param>
 54 public void SetTimeSpan(int value)
 55 {
 56 if (value > 0&& value <=7200000)
 57 {
 58 _TimeSpan = value;
 59 }
 60 }
 61 /// <summary>
 62 /// 设定缓存Cache中的大记录条数
 63 /// 值范围:1-5000000,1到500万
 64 /// </summary>
 65 /// <param name="value"></param>
 66 public void SetCacheMaxNum(int value)
 67 {
 68 if (value > 0 && value <= 5000000)
 69 {
 70 _CacheMaxNum = value;
 71 }
 72 }
 73 /// <summary>
 74 /// 设置是否在控制台中显示日志
 75 /// </summary>
 76 /// <param name="value"></param>
 77 public void SetIsShowMsg(bool value)
 78 {
 79 Helper.IsShowMsg = value;
 80 }
 81 /// <summary>
 82 /// 线程请求阻塞增量
 83 /// 值范围:1-CacheMaxNum,建议设置为缓存大值的10%-20%
 84 /// </summary>
 85 /// <param name="value"></param>
 86 public void SetBlockNumExt(int value)
 87 {
 88 if (value > 0 && value <= _CacheMaxNum)
 89 {
 90 _BlockNumExt = value;
 91 }
 92 }
 93 /// <summary>
 94 /// 请求阻塞时间
 95 /// 值范围:1-max,根据阻塞增量设置请求阻塞时间
 96 /// 阻塞时间越长,阻塞增量可以设置越大,但是请求实时响应就越差
 97 /// </summary>
 98 /// <param name="value"></param>
 99 public void SetBlockSpanTime(int value)
100 {
101 if (value > 0)
102 {
103 _BlockSpanTime = value;
104 }
105 }
106 #endregion
107 
108 #region 私有变量
109 /// <summary>
110 /// 内部运行线程
111 /// </summary>
112 private Thread _runner = null;
113 /// <summary>
114 /// 可处理高并发的队列
115 /// </summary>
116 private ConcurrentLinkedQueue<string> _Myqueue = null;
117 /// <summary>
118 /// 内容的时间健值对
119 /// </summary>
120 private Hashtable _DataKey = null;
121 /// <summary>
122 /// 内容时间队列
123 /// </summary>
124 private Queue _DataQueue = null;
125 /// <summary>
126 /// 定时线程的休眠时间长度:默认为1分钟
127 /// </summary>
128 private int _TimeSpan = 3000;
129 /// <summary>
130 /// 定时计时器线程
131 /// </summary>
132 private Thread _Timer = null;
133 /// <summary>
134 /// 缓存Cache中的大记录条数
135 /// </summary>
136 private int _CacheMaxNum = 500000;
137 /// <summary>
138 /// 线程请求阻塞增量
139 /// </summary>
140 private int _BlockNumExt = 10000;
141 /// <summary>
142 /// 请求阻塞时间
143 /// </summary>
144 private int _BlockSpanTime = 100;
145 #endregion
146 
147 #region 私有方法
148 private void StartRun()
149 {
150 _runner = new Thread(DoAction);
151 _runner.Start();
152 Helper.ShowMsg("内部线程启动成功!");
153 }
154 
155 private string GetItem()
156 {
157 string tp = string.Empty;
158 bool result = _Myqueue.TryDequeue(out tp);
159 return tp;
160 }
161 /// <summary>
162 /// 执行循环操作
163 /// </summary>
164 private void DoAction()
165 {
166 while (true)
167 {
168 while (!_Myqueue.IsEmpty)
169 {
170 string item = GetItem();
171 _DataQueue.Enqueue(item);
172 if (!_DataKey.ContainsKey(item))
173 {
174 _DataKey.Add(item, DateTime.Now);
175 }
176 }
177 //Helper.ShowMsg("当前数组已经为空,处理线程进入休眠状态...");
178 Thread.Sleep(2);
179 }
180 }
181 /// <summary>
182 /// 执行定时器的动作
183 /// </summary>
184 private void DoTicket()
185 {
186 while (true)
187 {
188 Helper.ShowMsg("当前数据队列个数:" + _DataQueue.Count.ToString());
189 if (_DataQueue.Count > _CacheMaxNum)
190 {
191 while (true)
192 {
193 Helper.ShowMsg(string.Format("当前队列数:{0},已经超出大长度:{1},开始进行清理操作...", _DataQueue.Count, _CacheMaxNum.ToString()));
194 string item = _DataQueue.Dequeue().ToString();
195 if (!string.IsNullOrEmpty(item))
196 {
197 if (_DataKey.ContainsKey(item))
198 {
199 _DataKey.Remove(item);
200 }
201 if (_DataQueue.Count <= _CacheMaxNum)
202 {
203 Helper.ShowMsg("清理完成,开始休眠清理线程...");
204 break;
205 }
206 }
207 }
208 }
209 Thread.Sleep(_TimeSpan);
210 }
211 }
212 
213 /// <summary>
214 /// 线程进行睡眠等待
215 /// 如果当前负载压力大大超出了线程的处理能力
216 /// 那么需要进行延时调用
217 /// </summary>
218 private void BlockThread()
219 {
220 if (_DataQueue.Count > _CacheMaxNum + _BlockNumExt)
221 {
222 Thread.Sleep(_BlockSpanTime);
223 }
224 }
225 #endregion
226 
227 #region 公共方法
228 /// <summary>
229 /// 开启服务线程
230 /// </summary>
231 public void Start()
232 {
233 if (_runner == null)
234 {
235 StartRun();
236 }
237 else
238 {
239 if (_runner.IsAlive == false)
240 {
241 StartRun();
242 }
243 }
244 
245 }
246 /// <summary>
247 /// 关闭服务线程
248 /// </summary>
249 public void Stop()
250 {
251 if (_runner != null)
252 {
253 _runner.Abort();
254 _runner = null;
255 }
256 }
257 
258 /// <summary>
259 /// 添加内容信息
260 /// </summary>
261 /// <param name="item">内容信息</param>
262 /// <returns>true:缓存中不包含此值,队列添加成功,false:缓存中包含此值,队列添加失败</returns>
263 public bool AddItem(string item)
264 {
265 BlockThread();
266 item = Helper.MakeMd5(item);
267 if (_DataKey.ContainsKey(item))
268 {
269 return false;
270 }
271 else
272 {
273 _Myqueue.Enqueue(item);
274 return true;
275 }
276 }
277 /// <summary>
278 /// 判断内容信息是否已经存在
279 /// </summary>
280 /// <param name="item">内容信息</param>
281 /// <returns>true:信息已经存在于缓存中,false:信息不存在于缓存中</returns>
282 public bool CheckItem(string item)
283 {
284 item = Helper.MakeMd5(item);
285 return _DataKey.ContainsKey(item);
286 }
287 #endregion 
288 
289 }
290 }


模拟测试代码:

private static string _example = Guid.NewGuid().ToString();
 private static UniqueCheck _uck = null;
 static void Main(string[] args)
 {
 _uck = UniqueCheck.GetInstance();
 _uck.Start();
 _uck.SetIsShowMsg(false);
 _uck.SetCacheMaxNum(20000000);
 _uck.SetBlockNumExt(1000000);
 _uck.SetTimeSpan(6000);
 _uck.AddItem(_example);
 Thread[] threads = new Thread[20];
 for (int i = 0; i < 20; i++)
 {
 threads[i] = new Thread(AddInfo);
 threads[i].Start();
 }
 Thread checkthread = new Thread(CheckInfo);
 checkthread.Start();
 string value = Console.ReadLine();
 checkthread.Abort();
 for (int i = 0; i < 50; i++)
 {
 threads[i].Abort();
 }
 _uck.Stop();
 }
 static void AddInfo()
 {
 while (true)
 {
 _uck.AddItem(Guid.NewGuid().ToString());
 }
 }
 static void CheckInfo()
 {
 while (true)
 {
 Console.WriteLine("开始时间:{0}...", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff"));
 Console.WriteLine("插入结果:{0}", _uck.AddItem(_example));
 Console.WriteLine("结束时间:{0}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff"));
          //调整进程休眠时间,可以测试高并发的情况
 //Thread.Sleep(1000);
 }
 
 }

测试截图:

分享好友

分享这个小栈给你的朋友们,一起进步吧。

IT知识联盟
创建时间:2019-07-05 15:30:45
分享收集到的大小知识点
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

栈主、嘉宾

查看更多
  • 王超
    栈主

小栈成员

查看更多
  • ?
  • youou
  • gamebus
  • chinacc
戳我,来吐槽~