flat7th

# メッセージキュー

created 2012-11-26 modified 2012-11-26 

Linuxのメッセージキューを作成・削除・書き込み・読み取り・クリアするコマンド。
何かの役に立つなら、修正済みBSDライセンスまたはX11ライセンスでどうぞ(= 商用利用可で何か書く必要もなし)。

./mq
        -n MQ_MAXMSG
        -s MQ_MSGSIZE
        -q MQ_NAME
        -m MSG
        -C              ... Create MQ
        -D              ... Delete MQ
        -R              ... Recv from MQ
        -S              ... Send to MQ
        -L              ... cLear MQ

makefile
mq: mq.cc
	g++ -o $@ $< -lrt

mq.cc
// mq.cc
//
// kuruma 2012/11/26
// LICENSE: fixed BSD or X11.
// this is what this is.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h> //getopt
#include <fcntl.h> // O_* constants
#include <sys/stat.h> // mode constants
#include <mqueue.h> // mq_open
#include <errno.h>

#include <iostream>
#include <string>


#define log(fmt, ...) do {							\
		fprintf(stderr,								\
				"        [debug]%s:%d:" fmt "\n",	\
				__func__, __LINE__, ##__VA_ARGS__);	\
	} while(0)


#define MY_MAX_MSGSIZE 1024


enum e_todo {
	UNKNOWN = 0
	,CREATE
	,DELETE
	,SEND
	,RECV
	,CLEAR
};


struct global
{
public:
	global() {};

public:
	const char *cmd_name;
	enum e_todo todo;
	std::string mq_name;
	int mq_maxmsg;
	int mq_msgsize;
	std::string msg;
} g;


int do_create()
{
	int rc = 0;
	mqd_t fd = -1;
	{
		int mode = O_CREAT;
		struct mq_attr attr;
		memset(&attr, 0, sizeof(attr));
		attr.mq_flags = 0;
		attr.mq_maxmsg = g.mq_maxmsg;
		attr.mq_msgsize = g.mq_msgsize;
		attr.mq_curmsgs = 0;

		fd = mq_open(g.mq_name.c_str(), mode, 0666, &attr);
		if(fd < 0) {
			log("mq_open() failed: %s", strerror(errno));
			return -1;
		}
	}

	{
		mq_close(fd);
	}

	return 0;
}


int do_delete()
{
	int rc = 0;
	{
		int mode = 0;
		rc = mq_unlink(g.mq_name.c_str());
		if(rc < 0) {
			log("mq_unlink() failed: %s", strerror(errno));
			return -1;
		}

	}
	return 0;
}


int do_send()
{
	int rc = 0;
	mqd_t fd = -1;
  
	{
		struct mq_attr attr;
		memset(&attr, 0, sizeof(attr));
		attr.mq_flags = 0;
		attr.mq_maxmsg = g.mq_maxmsg;
		attr.mq_msgsize = g.mq_msgsize;
		attr.mq_curmsgs = 0;

		fd = mq_open(g.mq_name.c_str(), O_WRONLY, 0666, &attr);
		if(fd < 0) {
			log("mq_open() failed: %s", strerror(errno));
			rc = -1;
		}
	}

	if(rc == 0)
	{
		int prio = 0;
		int rc = mq_send(fd, g.msg.c_str(), g.msg.size(), prio);
		if(rc < 0) {
			log("mq_send() failed: %s", strerror(errno));
			rc = -1;
		}
	}

	{
		if(0 <= fd) {
			int rc2 = mq_close(fd);
			if(rc2 < 0) {
				log("mq_close() failed: %s", strerror(errno));
				rc = -1;
			}
		}
	}

	return rc;
}


int do_recv()
{
	int rc = 0;
	mqd_t fd = -1;
  
	// open
	{
		struct mq_attr attr;
		memset(&attr, 0, sizeof(attr));
		attr.mq_flags = 0;
		attr.mq_maxmsg = g.mq_maxmsg;
		attr.mq_msgsize = g.mq_msgsize;
		attr.mq_curmsgs = 0;

		fd = mq_open(g.mq_name.c_str(), O_RDONLY, 0666, &attr);
		if(fd < 0) {
			log("mq_open() failed: %s", strerror(errno));
			rc = -1;
		}
	}

	// receive
	if(rc == 0)
	{
		// RECEIVE
		char buf[MY_MAX_MSGSIZE]; //NULL終端分なし
		memset(buf, 0, sizeof(buf));
		unsigned int prio = 0;
		ssize_t sz = mq_receive(fd, buf, sizeof(buf), &prio);
		if(sz < 0) {
			log("mq_receive() failed: %s", strerror(errno));
			rc = -1;
		}

		// PRINT
		if(0 <= rc) {
			printf("rc=%d, buf=[%.*s] prio=%d\n", rc, sizeof(buf), buf, prio);
		}
	}

	// close
	{
		if(0 <= fd) {
			int rc2 = mq_close(fd);
			if(rc2 < 0) {
				log("mq_close() failed: %s", strerror(errno));
				rc = -1;
			}
		}
	}

	return rc;
}


int do_clear()
{
	int rc = 0;
	mqd_t fd = -1;
  
	// open
	{
		struct mq_attr attr;
		memset(&attr, 0, sizeof(attr));
		attr.mq_flags = 0;
		attr.mq_maxmsg = g.mq_maxmsg;
		attr.mq_msgsize = g.mq_msgsize;
		attr.mq_curmsgs = 0;

		fd = mq_open(g.mq_name.c_str(), O_RDONLY, 0666, &attr);
		if(fd < 0) {
			log("mq_open() failed: %s", strerror(errno));
			rc = -1;
		}
	}

	// clear
	if(rc == 0)
	{
		char buf[MY_MAX_MSGSIZE]; //NULL終端分なし
		memset(buf, 0, sizeof(buf));
		unsigned int prio = 0;
		struct timespec abs_ts = {0, 0}; // 過去の時刻
		ssize_t sz = 1;
		while(0 <= sz)
		{
			sz = mq_timedreceive(fd, buf, sizeof(buf), &prio, &abs_ts);
			if(sz < 0)
			{
				if(errno == ETIMEDOUT) {
					log("mq_timedreceive() ETIMEOUT (OK, queue is empty.)");
					break;
				}
				log("mq_timedreceive() failed: %s", strerror(errno));
				rc = -1;
				break;
			}
			if(0 <= rc) {
				// PRINT
				printf("rc=%d, buf=[%.*s] prio=%d\n", rc, sizeof(buf), buf, prio);
			}
		  
		};
	}

	// close
	{
		if(0 <= fd) {
			int rc2 = mq_close(fd);
			if(rc2 < 0) {
				log("mq_close() failed: %s", strerror(errno));
				rc = -1;
			}
		}
	}

	return rc;
}


int do_main()
{
	int rc = 0;
	switch(g.todo) {
	case CREATE:
		{
			rc = do_create();
			if(rc < 0) {
				log("do_create() failed");
				return -1;
			}
		}
		break;

	case DELETE:
		{
			rc = do_delete();
			if(rc < 0) {
				log("do_delete() failed");
				return -1;
			}
		}
		break;

	case SEND:
		{
			rc = do_send();
			if(rc < 0) {
				log("do_send() failed");
				return -1;
			}
		}
		break;

	case RECV:
		{
			rc = do_recv();
			if(rc < 0) {
				log("do_recv() failed");
				return -1;
			}
		}
		break;

	case CLEAR:
		{
			rc = do_clear();
			if(rc < 0) {
				log("do_clear() failed");
				return -1;
			}
		}
		break;

	default:
		log("logic error");
		return -1;
	}

	return 0;
}



int init(int argc, char *argv[])
{
	int rc = 0;
	g.cmd_name = argv[0];

	for(;;) {
		int opt = getopt(argc, argv, "n:s:q:m:CDSRL");
		if(opt == -1) {
			break;
		}

		switch(opt) {
		case 'n':
			g.mq_maxmsg = atoi(optarg);
			break;
		case 's':
			g.mq_msgsize = atoi(optarg);
			break;
		case 'q':
			g.mq_name = std::string(optarg);
			break;
		case 'm':
			g.msg = std::string(optarg);
			break;
		case 'C':
			g.todo = CREATE;
			break;
		case 'D':
			g.todo = DELETE;
			break;
		case 'S':
			g.todo = SEND;
			break;
		case 'R':
			g.todo = RECV;
			break;
		case 'L':
			g.todo = CLEAR;
			break;
		case '?': // nobreak
		default:
			log("opt error");
			return -1;
		}
	}

	switch(g.todo) {
	case UNKNOWN:
		log("no opt");
		return -1;

	case CREATE:
		if(g.mq_maxmsg <= 0) {
			log("need mq_maxmsg");
			rc = -1;
		}
		if(g.mq_msgsize <= 0) {
			log("need mq_msgsize");
			rc = -1;
		}
		break;

	case DELETE:
		break;

	case SEND:
		if(g.msg.empty()) {
			log("need msg");
			rc = -1;
		}
		break;

	case RECV:
		break;

	case CLEAR:
		break;
	}

	if(g.mq_name.empty()) {
		log("need mq_name");
		rc = -1;
	}
	if(MY_MAX_MSGSIZE < g.mq_msgsize) {
		log("%d < MQ_MSGSIZE (too large)", MY_MAX_MSGSIZE);
		rc = -1;
	}


	return rc;
}


void fin()
{
}


void usage()
{
	std::cout << g.cmd_name << std::endl
			  << "\t-n MQ_MAXMSG" << std::endl
			  << "\t-s MQ_MSGSIZE" << std::endl
			  << "\t-q MQ_NAME" << std::endl
			  << "\t-m MSG" << std::endl
			  << "\t-C" "\t\t" "... Create MQ" << std::endl
			  << "\t-D" "\t\t" "... Delete MQ" << std::endl
			  << "\t-R" "\t\t" "... Recv from MQ" << std::endl
			  << "\t-S" "\t\t" "... Send to MQ" << std::endl
			  << "\t-L" "\t\t" "... cLear MQ" << std::endl
		;
}


int main(int argc, char *argv[])
{
	int rc;
	rc = init(argc, argv);
	if(rc < 0) {
		usage();
		log("init() failed");
	}
	
	if(rc == 0) {
		rc = do_main();
		if(rc < 0) {
			log("do_main() failed");
		}
	}

	fin();

	if(rc < 0) {
		return EXIT_FAILURE;
	}
	return EXIT_SUCCESS;
}