|
无锁队列的优点如下
1、元素是先进先出的
由队列的性质保证的,在环形队列中通过对队列的顺序访问保证
2、空间可以重复利用
因为一般的环形队列都是一个元素数固定的一个闭环,可以在环形队列初始化的时候分配好确定的内存空间,当进队或出队时只需要返回指定元素内存空间的地址即可,这些内存空间可以重复利用,避免频繁内存分配和释放的开销
3、消息处理单线程化
最典型的生产者消费者模型中,如果引入环形队列,那么生成者只需要生成“东西”然后放到环形队列中即可,而消费者只需要从环形队列里取“东西”并且消费即可,没有任何锁或者等待,巧妙的高效实现了多线程数据通信。
这样处理没有了mutex,但效率较高。值得注意的是,当write和read相等时,表示满也表示队满也表示队空,这个队列是2个线程共享的,因此在这里是保证队空和队满时的线程安全,那么整个队列就是安全的,因为其他情况下read和write根本没有交集,当他们都在读写同一个index的时候,read线程先取走,再把标识置为false,write线程是先把内容写入完后再置true,这个读写和置false true的顺序一定不能乱否则就会标记位准备好了 。
- #include <iostream>
- #include <pthread.h>
- #include <queue>
- #include <strings.h>
- #include <sstream>
- #include <unistd.h>
- using QueuePair = std::pair<unsigned int, unsigned char*>;
- using CmdQueue = std::pair<volatile bool, QueuePair>;
- class NonLockQueue
- {
- public:
- NonLockQueue():read(0), write(0), msg(NULL), msgLen(0)
- {}
- ~NonLockQueue()
- {
- }
- QueuePair *front()
- {
- if(cmdQueue[read].first)
- return &cmdQueue[read].second;
- return NULL;
- }
- void pop()
- {
- cmdQueue[read].first = false;
- read = (++read) % 1024;
- }
- bool push(const void *msg, const unsigned int len)
- {
- unsigned char* buf = new unsigned char[len];
- if(buf)
- {
- bcopy(msg, buf, len);
- if(!dump() && !cmdQueue[write].first)
- {
- cmdQueue[write].second.first = len;
- cmdQueue[write].second.second = buf;
- cmdQueue[write].first = true;
- write = (++write) % 1024;
- return true;
- }
- else
- {
- //如果队列满了,则写入缓冲
- cacheQueue.push(std::make_pair(len, buf));
- }
- return true;
- }
- return false;
- }
- private:
- bool dump()
- {
- //缓冲中还有数据
- while(!cacheQueue.empty())
- {
- if(cmdQueue[write].first)
- {
- return true;
- }
- // 优先将缓冲中的数据写入到队列
- cmdQueue[write].second = cacheQueue.front();
- cmdQueue[write].first = true;
- write = ++write % 1024;
- cacheQueue.pop();
- }
- return false;
- }
- CmdQueue cmdQueue[1024];
- std::queue<QueuePair> cacheQueue;
- unsigned int read;
- unsigned int write;
- unsigned char *msg;
- unsigned int msgLen;
- };
- static void *funWrite(void *arg)
- {
- NonLockQueue* pQue = (NonLockQueue*)arg;
- for(int i = 0; i < 1000; ++i)
- {
- std::stringstream ss;
- ss << i;
- pQue->push(ss.str().c_str(), ss.str().size() + 1);
- }
- return NULL;
- }
- static void* funRead(void *arg)
- {
- NonLockQueue* pQue = (NonLockQueue*)arg;
- while(true)
- {
- QueuePair* pPair = pQue->front();
- if(!pPair)
- {
- continue;
- }
- std::cout << pPair->first << "----" << pPair->second << std::endl;
- pQue->pop();
- }
- return NULL;
- }
- int main()
- {
- NonLockQueue que;
- pthread_t t1, t2;
- pthread_create(&t1, NULL, funWrite, &que);
- pthread_create(&t2, NULL, funRead, &que);
- while(true)
- {
- }
- pthread_join(t1, NULL);
- pthread_join(t2, NULL);
- return 0;
- }
复制代码 部分结果如下:
不用标识的实现方法,这里空出一个单元来标识队满,这样的原则:队列中有元素则可读,队列满则写缓存
- (write - read + MAX_SIZE) % MAX_SIZE >= 1 //队列中有元素
- (write + 1) % MAX_SIZE == read //队列满
复制代码 另外一种方法
- #include <iostream>
- #include <pthread.h>
- #include <queue>
- #include <strings.h>
- #include <sstream>
- #include <unistd.h>
- using QueuePair = std::pair<unsigned int, unsigned char*>;
- const unsigned int MAX_SIZE = 1024;
- class NonLockQueue
- {
- public:
- NonLockQueue():read(0), write(0) {}
- ~NonLockQueue()
- {
- }
- QueuePair *front()
- {
- if((write - read + MAX_SIZE) % MAX_SIZE >= 1)
- return &cmdQueue[read];
- return NULL;
- }
- void pop()
- {
- if((write - read + MAX_SIZE) % MAX_SIZE >= 1)
- read = (++read) % MAX_SIZE;
- else
- return;
- }
- bool push(const void *msg, const unsigned int len)
- {
- unsigned char* buf = new unsigned char[len];
- if(buf)
- {
- bcopy(msg, buf, len);
- if(!dump() && !((write + 1) % MAX_SIZE == read))
- {
- cmdQueue[write].first = len;
- cmdQueue[write].second = buf;
- write = (++write) % MAX_SIZE;
- return true;
- }
- else
- {
- //如果队列满了,则写入缓冲
- cacheQueue.push(std::make_pair(len, buf));
- }
- return true;
- }
- return false;
- }
- private:
- bool dump()
- {
- //缓冲中还有数据
- while(!cacheQueue.empty())
- {
- if((write + 1) % MAX_SIZE == read)
- return true;
- // 优先将缓冲中的数据写入到队列
- cmdQueue[write] = cacheQueue.front();
- write = ++write % MAX_SIZE;
- cacheQueue.pop();
- }
- return false;
- }
- QueuePair cmdQueue[MAX_SIZE];
- std::queue<QueuePair> cacheQueue;
- unsigned int read;
- unsigned int write;
- };
复制代码
|
|