/* Pekka "PQ" Paalanen
 * 15.02.2004
 *
 * Threads message passing demo
 *
 * compile with: gcc -Wall -W -O2 -g -lpthread thread_msg.c -o thread_msg
 */


#include <pthread.h>
#include <math.h> /* modf() */
#include <sys/time.h> /* gettimeofday() */
#include <errno.h> /* ETIMEDOUT */

/* includes for the example */
#include <stdio.h> /* printf() */
#include <time.h> /* nanosleep() */
#include <stdlib.h> /* random() */



#define EMPTY_MESSAGE -1

#define RET_OK 0
#define RET_FAIL 1

typedef struct tag_msgdevice
{
	pthread_mutex_t mutex;
	pthread_cond_t cond;
	int msg_type;
	void *data;
} msgdevice_t;


/* blocking send */
int bsend_msg(msgdevice_t *target, int msg_type, void *data);

/* blocking receive */
int brecv_msg(msgdevice_t *src, int *pmsg_type, void **pdata);

/* non-blocking receive */
int nbrecv_msg(msgdevice_t *src, int *pmsg_type, void **pdata);

/* blocking receive with timeout in seconds */
int tbrecv_msg(msgdevice_t *src, double tout, int *pmsg_type, void **pdata);

/* initializer */
int init_msgdevice(msgdevice_t *dev);

/* destroyer, needs fixing, not recommended to be used */
int destroy_msgdevice(msgdevice_t *dev);




int bsend_msg(msgdevice_t *target, int msg_type, void *data)
{
	/* blocking message send */
	
	/* lock the mutex to get acccess to *target */
	pthread_mutex_lock(&target->mutex);
	
	/* if it is not empty, wait until it is */
	while(target->msg_type != EMPTY_MESSAGE)
	{
		pthread_cond_wait(&target->cond, &target->mutex);
	}
	
	/* now the target is empty and we have the locked mutex */
	target->msg_type = msg_type;
	target->data = data;
	
	/* signal all other waiting threads to awake */
	pthread_cond_broadcast(&target->cond);
	
	/* unlock the mutex */
	pthread_mutex_unlock(&target->mutex);
	
	return RET_OK;
}


int brecv_msg(msgdevice_t *src, int *pmsg_type, void **pdata)
{
	/* blocking receive, wait until there is a message */

	/* lock the mutex to get acccess to *src */
	pthread_mutex_lock(&src->mutex);
	
	/* if it is not empty, wait until it is */
	while(src->msg_type == EMPTY_MESSAGE)
	{
		pthread_cond_wait(&src->cond, &src->mutex);
	}
	
	/* now the src has message and we have the locked mutex */
	
	/* get the message */
	*pmsg_type = src->msg_type;
	if(pdata)
		*pdata = src->data;
	
	/* reset message device */
	src->msg_type = EMPTY_MESSAGE;
	src->data = NULL;
	
	/* signal all other waiting threads to awake */
	pthread_cond_broadcast(&src->cond);
	
	/* unlock the mutex */
	pthread_mutex_unlock(&src->mutex);
	
	return RET_OK;
}


int nbrecv_msg(msgdevice_t *src, int *pmsg_type, void **pdata)
{
	/* non-blocking receive, might return without a message */
	
	/* lock the mutex to get acccess to *src */
	pthread_mutex_lock(&src->mutex);
	
	/* is there a message? */
	if(src->msg_type != EMPTY_MESSAGE)
	{
		/* yes, there is */
		*pmsg_type = src->msg_type;
		if(pdata)
			*pdata = src->data;
		
		/* reset msg device */
		src->msg_type = EMPTY_MESSAGE;
		src->data = NULL;
		
		/* signal all other waiting threads to awake */
		pthread_cond_broadcast(&src->cond);

		/* unlock the mutex */
		pthread_mutex_unlock(&src->mutex);
		
		return RET_OK;
	}
	
	/* there was no message */
	
	/* unlock the mutex */
	pthread_mutex_unlock(&src->mutex);
	
	return RET_FAIL;
}


int tbrecv_msg(msgdevice_t *src, double tout, int *pmsg_type, void **pdata)
{
	/* Blocking receive with timeout of 'tout' seconds,
	 * may return without a message. */
	
	
	struct timeval now;
	struct timespec timeout;
	int ret;
	double frac;

	/* lock the mutex to get acccess to *src */
	pthread_mutex_lock(&src->mutex);
	
	/* calculate the timeout */
	gettimeofday(&now, NULL);
	timeout.tv_sec = now.tv_sec + (long)modf(tout, &frac);
	timeout.tv_nsec = (now.tv_usec + (long)(frac * 1000000.0)) * 1000;
	ret = 0;
	
	/* if it is not empty, wait until it is 
	 * or until timeout */
	while((src->msg_type == EMPTY_MESSAGE) && (ret != ETIMEDOUT))
	{
		ret = 
		   pthread_cond_timedwait(&src->cond, &src->mutex, &timeout);
	}
	
	/* we have the locked mutex, check for message */
	if(src->msg_type != EMPTY_MESSAGE)
	{
		/* yes, there is */
		*pmsg_type = src->msg_type;
		if(pdata)
			*pdata = src->data;
		
		/* reset msg device */
		src->msg_type = EMPTY_MESSAGE;
		src->data = NULL;
		
		/* signal all other waiting threads to awake */
		pthread_cond_broadcast(&src->cond);

		/* unlock the mutex */
		pthread_mutex_unlock(&src->mutex);
		
		return RET_OK;
	}
	
	/* there was no message */
	
	/* unlock the mutex */
	pthread_mutex_unlock(&src->mutex);
	
	return RET_FAIL;
}


int init_msgdevice(msgdevice_t *dev)
{
	if(!dev)
		return RET_FAIL;
	
	/* init cond device, the cond_attr is ignored in Linux */
	pthread_cond_init(&dev->cond, NULL);
	
	/* init mutex */
	pthread_mutex_init(&dev->mutex, NULL);
	
	/* reset message */
	dev->msg_type = EMPTY_MESSAGE;
	dev->data = NULL;
	
	return RET_OK;
}

int destroy_msgdevice(msgdevice_t *dev)
{
	int ret;
	
	if(!dev)
		return RET_FAIL;
	
	ret = pthread_cond_destroy(&dev->cond);
	if(ret)
	{
		/* FIXME: there are threads waiting on the cond */
		return RET_FAIL;
	}
	
	ret = pthread_mutex_destroy(&dev->mutex);
	if(ret)
	{
		return RET_FAIL;
		/* FIXME: what should be done when cond is destroyed, but
		 * mutex is still locked? */
	}
	
	return RET_OK;
}


/*************** CUT HERE *****************/

/* example begins */



typedef struct
{
	msgdevice_t mydev;
	msgdevice_t *parentdev;
	pthread_t ptid;
	int tid;
} thdata_t;


void *threadfunc(void *ptr)
{
	thdata_t *thd = ptr;
	int i;
	int msgt;
	struct timespec tm;

	tm.tv_sec = 4;
	tm.tv_nsec = 0;
	
	while(1)
	{
		i = random() % 20;

		printf("Th-%d sending %d to Main\n", thd->tid, i);
		bsend_msg(thd->parentdev, i, thd);
		printf("Th-%d done sending.\n", thd->tid);

		brecv_msg(&thd->mydev, &msgt, NULL);
		printf("Th-%d got reply %d\n", thd->tid, msgt);

		nanosleep(&tm, NULL);
		/* stopping condition missing */
	}
	
	return NULL;
}


#define NTHREADS	4
int main(void)
{
	thdata_t thds[NTHREADS];
	msgdevice_t mdev;
	int i;
	int ret;
	int msgt;
	struct timespec tm;
	thdata_t *pth;
	
	tm.tv_sec = 0;
	tm.tv_nsec = 400*1000000;
	
	init_msgdevice(&mdev);
	for(i=0; i<NTHREADS; i++)
	{
		init_msgdevice(&thds[i].mydev);
		thds[i].parentdev = &mdev;
		thds[i].tid = i;
		pthread_create(&thds[i].ptid, NULL, threadfunc, &thds[i]);
	}
	
	
	while(1)
	{
		/* ret = nbrecv_msg(&mdev, &msgt, (void*)&pth); */
		ret = tbrecv_msg(&mdev, 1.5, &msgt, (void*)&pth);
		if(ret == RET_OK)
		{
			printf("Main: got message %d from Th-%d\n",
			   msgt, pth->tid);
			bsend_msg(&pth->mydev, 99, NULL);
			printf("Main: replied 99.\n");
		}
		else
			printf("Main: no message :-(\n");
		/* nanosleep(&tm, NULL); */
		
		/* stopping condition missing */
	}
	
	for(i=0; i<NTHREADS; i++)
	{
		pthread_join(thds[i].ptid, NULL);
	}
	
	return 0;
}




