/*
    Buffer.  Very fast reblocking filter speedy writing of tapes.
    Copyright (C) 1990,1991  Lee McLoughlin

    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation; either version 1, or (at your option)
    any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program; if not, write to the Free Software
    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.

    Lee McLoughlin.
    Dept of Computing, Imperial College,
    180 Queens Gate, London, SW7 2BZ, UK.

    Email: L.McLoughlin@doc.ic.ac.uk
*/

/* This is a reblocking process, designed to try and read from stdin
 * and write to stdout - but to always try and keep the writing side
 * busy.  It is meant to try and stream tape writes.
 *
 * This program runs in two parts.  The reader and the writer.  They
 * communicate using shared memory with semaphores locking the access.
 * The shared memory implements a circular list of blocks of data.
 *
 * L.McLoughlin, Imperial College, 1990
 *
 * $Log: buffer.c,v $
 * Revision 1.15  1992/11/23  23:32:58  lmjm
 * Oops!  This should be outside the ifdef
 *
 * Revision 1.14  1992/11/23  23:29:58  lmjm
 * allow MAX_BLOCKSIZE and DEF_SHMEM to be configured
 *
 * Revision 1.13  1992/11/23  23:22:29  lmjm
 * Printf's use %lu where appropriate.
 *
 * Revision 1.12  1992/11/23  23:17:55  lmjm
 * Got rid of floats and use Kbyte counters instead.
 *
 * Revision 1.11  1992/11/03  23:11:51  lmjm
 * Forgot Andi Karrer on the patch list.
 *
 * Revision 1.10  1992/11/03  22:58:41  lmjm
 * Cleaned up the debugging prints.
 *
 * Revision 1.9  1992/11/03  22:53:00  lmjm
 * Corrected stdin, stout and showevery use.
 *
 * Revision 1.8  1992/11/03  22:41:34  lmjm
 * Added 2Gig patches from:
 * Andi Karrer <karrer@bernina.ethz.ch>
 * Rumi Zahir <rumi@iis.ethz.ch>
 * Christoph Wicki <wicki@iis.ethz.ch>
 *
 * Revision 1.7  1992/07/23  20:42:03  lmjm
 * Added 't' option to print total writen at end.
 *
 * Revision 1.6  1992/04/07  19:57:30  lmjm
 * Added Kevins -B and -p options.
 * Turn off buffering to make -S output appear ok.
 * Added GPL.
 *
 * Revision 1.5  90/07/22  18:46:38  lmjm
 * Added system 5 support.
 * 
 * Revision 1.4  90/07/22  18:29:48  lmjm
 * Updated arg handling to be more consistent.
 * Make sofar printing size an option.
 * 
 * Revision 1.3  90/05/15  23:27:46  lmjm
 * Added -S option (show how much has been writen).
 * Added -m option to specify how much shared memory to grab.
 * Now tries to fill this with blocks.
 * reader waits for writer to terminate and then frees the shared mem and sems.
 * 
 * Revision 1.2  90/01/20  21:37:59  lmjm
 * Reset default number of  blocks and blocksize for best thruput of
 * standard tar 10K blocks.
 * Allow number of blocks to be changed.
 * Don't need a hole in the circular queue since the semaphores prevent block
 * clash.
 * 
 * Revision 1.1  90/01/17  11:30:23  lmjm
 * Initial revision
 * 
 */
#include <unistd.h>
#include <stdio.h>
#include <signal.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/sem.h>

#ifndef lint
static char *rcsid = "$Header: /a/swan/home/swan/staff/csg/lmjm/src/buffer/RCS/buffer.c,v 1.15 1992/11/23 23:32:58 lmjm Exp lmjm $";
#endif

extern char *shmat();

/* General macros */
#define TRUE 1
#define FALSE 0
#define K *1024

/* Some forward declarations */
void byee();
void start_reader_and_writer();

/* When showing print a note every this many bytes writen */
int showevery = 0;
#define PRINT_EVERY 10 K

/* Pause after every write */
unsigned write_pause;

/* This is the inter-process buffer - it implements a circular list
 * of blocks. */

#ifndef MAX_BLOCKSIZE
#define MAX_BLOCKSIZE (512 K)
#endif
#define DEF_BLOCKSIZE (10 K)
int blocksize = DEF_BLOCKSIZE;

/* Numbers of blocks in the queue. 
 */
#define MAX_BLOCKS 2048
int blocks = 1;
/* Circular increment of a buffer index */
#define INC(i) (((i)+1) == blocks ? 0 : ((i)+1))

/* Max amount of shared memory you can allocate - can't see a way to look
 * this up.
 */
#ifndef DEF_SHMEM
#define DEF_SHMEM (1 K K)
#endif
int max_shmem = DEF_SHMEM;

/* Just a flag to show unfilled */
#define NONE (-1)

/* the shared memory id of the buffer */
int buffer_id = NONE;
struct block {
	int bytes;
	char *data;
} *curr_block;

#define NO_BUFFER ((struct buffer *)-1)
struct buffer {
	/* writer will hang trying to lock this till reader fills in a block */
	int blocks_used_lock;
	/* reader will hang trying to lock this till writer empties a block */
	int blocks_free_lock;

	int next_block_in;
	int next_block_out;

	struct block block[ MAX_BLOCKS ];

	/* These actual space for the blocks is here - the array extends
	 * pass 1 */
	char data_space[ 1 ];
} *pbuffer = NO_BUFFER;
int buffer_size;

int fdin	= 0;
int fdout	= 1;
int in_ISCHR	= 0;
int out_ISCHR	= 0;
int padblock	= FALSE;
int writer_pid	= 0;
int reader_pid	= 0;
int percent	= 0;
int debug	= 0;
int Zflag	= 0;
char *progname = "buffer";

char print_total = 0;
/* Number of K output */
unsigned long outk = 0;

main( argc, argv )
	int argc;
	char **argv;
{
	parse_args( argc, argv );

	set_handlers();

	buffer_allocate();

	start_reader_and_writer();

	byee( 0 );
}

parse_args( argc, argv )
	int argc;
	char **argv;
{
	int c;
	int iflag = 0;
	int oflag = 0;
	int zflag = 0;
	extern char *optarg;
	extern int optind;
	char blocks_given = FALSE;
	struct stat buf;


	while( (c = getopt( argc, argv, "BS:Zdm:s:b:p:u:ti:o:z:" )) != -1 ){
		switch( c ){
		case 't': /* Print to stderr the total no of bytes writen */
			print_total++;
			break;
		case 'u': /* pause after write for given microseconds */
			write_pause = atoi( optarg );
			break;
		case 'B':   /* Pad last block */
			padblock = TRUE;
			break;
		case 'Z':   /* Zero by lseek on the tape device */
			Zflag = TRUE;
			break;
		case 'i': /* Input file */
			iflag++;
			if( iflag > 1 ){
				fprintf( stderr, "buffer: -i given twice\n" );
				byee( -1 );
			}
			if( (fdin = open( optarg, O_RDONLY )) < 0 ){
				perror( "buffer: cannot open input file" );
				fprintf( stderr, "filename: %s\n", optarg );
				byee ( -1 );
			}
			break;
		case 'o': /* Output file */
			oflag++;
			if( oflag > 1 ){
				fprintf( stderr, "buffer: -o given twice\n" );
				byee( -1 );
			}
			if( (fdout = open( optarg, O_WRONLY | O_CREAT | O_TRUNC, 0666 )) < 0 ){
				perror( "buffer: cannot open output file" );
				fprintf( stderr, "filename: %s\n", optarg );
				byee ( -1 );
			}
			break;
		case 'S':
			/* Show every once in a while how much is printed */
			showevery = do_size( optarg );
			if( showevery <= 0 )
				showevery = PRINT_EVERY;
			break;
		case 'd':	/* debug */
			debug++;
			if( debug == 1 ){
				setbuf( stdout, NULL );
				setbuf( stderr, NULL );
				fprintf( stderr, "debugging turned on\n" );
			}
			break;
		case 'm':
			/* Max size of shared memory lump */
			max_shmem = do_size( optarg );

			if( max_shmem < (sizeof( struct buffer ) + (blocksize * blocks)) ){
				fprintf( stderr, "max_shmem %d too low\n", max_shmem );
				byee( -1 );
			}
			break;
		case 'b':
			/* Number of blocks */
			blocks_given = TRUE;
			blocks = atoi( optarg );
			if( (blocks <= 0) || (MAX_BLOCKS < blocks) ){
				fprintf( stderr, "blocks %d out of range\n", blocks );
				byee( -1 );
			}
			break;
		case 'p':	/* percent to wait before dumping */
			percent = atoi( optarg );

			if( (percent < 0) || (100 < percent) ){
				fprintf( stderr, "percent %d out of range\n", percent );
				byee( -1 );
			}
			if( debug )
				fprintf( stderr, "percent set to %d\n", percent );
			break;
		case 'z':
			zflag++;
			/* FALL THRU */
		case 's':	/* Size of a block */
			blocksize = do_size( optarg );

			if( (blocksize <= 0) || (MAX_BLOCKSIZE < blocksize) ){
				fprintf( stderr, "blocksize %d out of range\n", blocksize );
				byee( -1 );
			}
			break;
		default:
			fprintf( stderr, "Usage: %s [-B] [-t] [-S size] [-m memsize] [-b blocks] [-p percent] [-s blocksize] [-u pause] [-i infile] [-o outfile] [-z size]\n",
				progname );
			fprintf( stderr, "-B = blocked device - pad out last block\n" );
			fprintf( stderr, "-t = show total amount writen at end\n" );
			fprintf( stderr, "-S size = show amount writen every size bytes\n" );
			fprintf( stderr, "-m size = size of shared mem chunk to grab\n" );
			fprintf( stderr, "-b num = number of blocks in queue\n" );
			fprintf( stderr, "-p percent = don't start writing until percent blocks filled\n" );
			fprintf( stderr, "-s size = size of a block\n" );
			fprintf( stderr, "-u usecs = microseconds to sleep after each write\n" );
			fprintf( stderr, "-i infile = file to read from\n" );
			fprintf( stderr, "-o outfile = file to write to\n" );
			fprintf( stderr, "-z size = combined -S/-s flag\n" );
			byee( -1 );
		}
	}

	if (zflag) showevery = blocksize;

	/* If -b was not given try and work out the max buffer size */
	if( !blocks_given ){
		blocks = (max_shmem - sizeof( struct buffer )) / blocksize;
		if( blocks <= 0 ){
			fprintf( stderr, "Cannot handle blocks that big, aborting!\n" );
			byee( -1 );
		}
		if( MAX_BLOCKS < blocks  ){
			fprintf( stderr, "Cannot handle that many blocks, aborting!\n" );
			byee( -1 );
		}
	}

	/* check if fdin or fdout are character special files */
	if( fstat( fdin, &buf ) != 0 ){
		perror( "buffer: can't stat input file" );
		byee( -1 );
	}
	in_ISCHR = S_ISCHR( buf.st_mode );
	if( fstat( fdout, &buf ) != 0 ){
		perror( "buffer: can't stat output file" );
		byee( -1 );
	}
	out_ISCHR = S_ISCHR( buf.st_mode );
}

/* The interrupt handler */
shutdown()
{
	byee( -1 );
}

set_handlers()
{
	signal( SIGHUP, shutdown );
	signal( SIGINT, shutdown );
	signal( SIGQUIT, shutdown );
	signal( SIGTERM, shutdown );

	if( writer_pid ){
		/* This is the reader - propogate the signal to the writer */
		kill( writer_pid, SIGTERM );
	}
}

buffer_allocate()
{
	int i;

	/* Allow for the data space */
	buffer_size = sizeof( struct buffer ) +
		((blocks * blocksize) - sizeof( char ));

	/* Create the space for the buffer */
	buffer_id = shmget( IPC_PRIVATE,
			   buffer_size,
			   IPC_CREAT|S_IREAD|S_IWRITE );
	if( buffer_id < 0 ){
		perror( "buffer: couldn't create shared memory segment" );
		byee( -1 );
	}

	get_buffer();

	if( debug )
		fprintf( stderr, "pbuffer is 0x%08x, buffer_size is %d [%d x %d]\n",
			(char *)pbuffer, buffer_size, blocks, blocksize );

#ifdef SYS5
	memset( (char *)pbuffer, '\0', buffer_size );
#else
	bzero( (char *)pbuffer, buffer_size );
#endif
	pbuffer->blocks_used_lock = -1;
	pbuffer->blocks_free_lock = -1;

	pbuffer->blocks_used_lock = new_sem();
	/* Start it off locked - it is unlocked when a buffer gets filled in */
	lock( pbuffer->blocks_used_lock );

	pbuffer->blocks_free_lock = new_sem();
	/* start this off so lock() can be called on it for each block
	 * till all the blocks are used up */
	sem_set( pbuffer->blocks_free_lock, blocks - 1 );

	/* Detattach the shared memory so the fork doesnt do anything odd */
	shmdt( (char *)pbuffer );
	pbuffer = NO_BUFFER;
}

buffer_remove()
{
	static char removing = FALSE;
	int i;

	/* Avoid accidental recursion */
	if( removing )
		return;
	removing = TRUE;

	/* Buffer not yet created */
	if( buffer_id == NONE )
		return;

	/* There should be a buffer so this must be after its detached it
	 * but before the fork picks it up */
	if( pbuffer == NO_BUFFER )
		get_buffer();

	if( debug )
		fprintf( stderr, "removing semaphores and buffer\n" );
	remove_sem( pbuffer->blocks_used_lock );
	remove_sem( pbuffer->blocks_free_lock );
	
	if( shmctl( buffer_id, IPC_RMID, (struct shmid_ds *)0 ) == -1 )
		perror( "buffer: failed to remove shared memory buffer" );
}

get_buffer()
{
	int b;

	/* Grab the buffer space */
	pbuffer = (struct buffer *)shmat( buffer_id, (char *)0, 0 );
	if( pbuffer == NO_BUFFER ){
		perror( "buffer: failed to attach shared memory" );
		byee( -1 );
	}

	/* Setup the data space pointers */
	for( b = 0; b < blocks; b++ )
		pbuffer->block[ b ].data =
			&pbuffer->data_space[ b * blocksize ];

}

void
start_reader_and_writer()
{
	int status, deadpid;

	fflush( stdout );
	fflush( stderr );

	if( (writer_pid = fork()) == -1 ){
		perror( "buffer: unable to fork" );
		byee( -1 );
	}
	else if( writer_pid == 0 ){
		reader_pid = getppid();

		/* Never trust fork() to propogate signals - reset them */
		set_handlers();

		writer();
	}
	else {
		reader();

		/* Now wait for the writer to finish */
		while( ((deadpid = wait( &status )) != writer_pid) &&
			deadpid != -1 )
			;
	}
}

/* Read from stdin into the buffer */
reader()
{
	if( debug )
		fprintf( stderr, "R: Entering reader\n" );

	get_buffer();

	while( 1 ){
		get_next_free_block();
		if( ! fill_block() )
			break;
	}

	if( debug )
		fprintf( stderr, "R: Exiting reader\n" );
}

get_next_free_block()
{
	/* Maybe wait till there is room in the buffer */
	lock( pbuffer->blocks_free_lock );

	curr_block = &pbuffer->block[ pbuffer->next_block_in ];

	pbuffer->next_block_in = INC( pbuffer->next_block_in );
}

fill_block()
{
	int bytes;
	char *start;
	int toread;
	static char eof_reached = 0;
	
	if( eof_reached ){
		curr_block->bytes = 0;
		unlock( pbuffer->blocks_used_lock );
		return 0;
	}

	start = curr_block->data;
	toread = blocksize;

	/* Fill the block with input.  This reblocks the input. */
	while( toread != 0 && (bytes = read( fdin, start, toread )) > 0 ){
		start += bytes;
		toread -= bytes;
	}

	if( bytes == 0 ){
		eof_reached = 1;
	}

	if( bytes < 0 ){
		perror( "buffer: failed to read input" );
		byee( -1 );
	}

	/* number of bytes available. Zero will be taken as eof */
	if( !padblock || toread == blocksize )
		curr_block->bytes = blocksize - toread;
	else {
		if( toread ) bzero( start, toread );
		curr_block->bytes = blocksize;
	}

	if( debug > 1 )
		fprintf( stderr, "R: got %d bytes\n", curr_block->bytes );

	unlock( pbuffer->blocks_used_lock );

	return curr_block->bytes;
}

/* Write the buffer to stdout */
writer()
{
	int filled = 0;
	int maxfilled = (blocks * percent) / 100;
	int first_block;

	if( debug )
		fprintf( stderr, "\tW: Entering writer\n blocks = %d\n maxfilled = %d\n",
			blocks,
			maxfilled );

	get_buffer();

	while( 1 ){
		if( !filled )
			first_block = pbuffer->next_block_out;
		get_next_filled_block();
		if( !data_to_write() )
			break;

		filled++;
		if( debug > 1 )
			fprintf( stderr, "W: filled = %d\n", filled );
		if( filled >= maxfilled ){
			if( debug > 1 )
				fprintf( stderr, "W: writing\n" );
			write_blocks_to_stdout( filled, first_block );
			filled = 0;
		}
	}

	write_blocks_to_stdout( filled, first_block );

	if( showevery ){
		pr_out();
	}

	if( print_total ){
		fprintf( stderr, "Kilobytes Out %lu\n", outk );
	}

	if( debug )
		fprintf( stderr, "\tW: Exiting writer\n" );
}

get_next_filled_block()
{
	/* Hang till some data is available */
	lock( pbuffer->blocks_used_lock );

	curr_block = &pbuffer->block[ pbuffer->next_block_out ];

	pbuffer->next_block_out = INC( pbuffer->next_block_out );
}

data_to_write()
{
	return curr_block->bytes;
}

write_blocks_to_stdout( filled, first_block )
	int filled;
	int first_block;
{
	pbuffer->next_block_out = first_block;

	while( filled-- ){
		curr_block = &pbuffer->block[ pbuffer->next_block_out ];
		pbuffer->next_block_out = INC( pbuffer->next_block_out );
		write_block_to_stdout();
	}
}

write_block_to_stdout()
{
	static unsigned long out = 0;
	static unsigned long last_gb = 0;
	static unsigned long next_k = 0;
	int written;

	if( next_k == 0 && showevery ){
		if( debug > 3 )
			fprintf( stderr, "next_k = %lu showevery = %lu\n", next_k, showevery );
		showevery = showevery / 1024;
		next_k = showevery;
	}

	if( (written = write( fdout, curr_block->data, curr_block->bytes )) != curr_block->bytes ){
		perror( "buffer: write of data failed" );
		fprintf( stderr, "bytes to write=%d, bytes written=%d, total written %10luK\n", curr_block->bytes, written, outk );
		byee( -1 );
	}

	if( write_pause ){
		usleep( write_pause );
	}

	out = curr_block->bytes / 1024;
	outk += out;
	last_gb += out;

	/*
	 * on character special devices (tapes), do an lseek() every 1 Gb,
	 * to overcome the 2Gb limit. This resets the file offset to
	 * zero, but -- at least on exabyte SCSI drives -- does not perform
	 * any actual action on the tape.
	 */
	if( Zflag && last_gb >= 1 K K ){
		last_gb = 0;
		if( in_ISCHR )
			(void) lseek( fdin, 0, SEEK_SET);
		if( out_ISCHR )
			(void) lseek( fdout, 0, SEEK_SET);
	}
	if( showevery ){
		if( debug > 3 )
			fprintf( stderr, "outk = %lu, next_k = %lu\n",
				outk, next_k );
		if( outk >= next_k ){
			pr_out();
			next_k += showevery;
		}
	}

	unlock( pbuffer->blocks_free_lock );
}


void
byee( exit_val )
	int exit_val;
{
	/* Only the parent (reader) should zap the buffer */
	if( writer_pid != 0 )
		buffer_remove();
	else if( showevery )
		fprintf( stderr, "\n" );

	exit( exit_val );
}

/* Given a string of <num>[<suff>] returns a num
 * suff =
 *   m/M for 1meg
 *   k/K for 1k
 *   b/B for 512
 */
do_size( arg )
	char *arg;
{
	char format[ 20 ];
	int ret;

	*format = '\0';
	sscanf( arg, "%d%s", &ret, format );

	switch( *format ){
	case 'm':
	case 'M':
		ret = ret K K;
		break;
	case 'k':
	case 'K':
		ret = ret K;
		break;
	case 'b':
	case 'B':
		ret *= 512;
		break;
	}
	
	return ret;
}

pr_out()
{
	fprintf( stderr, "% 10luK\r", outk );
}

#ifdef SYS5
#include <sys/time.h>

bzero( b, l )
	char *b;
	unsigned l;
{
	memset( b, '\0', l );
}

usleep_back()
{
}

usleep( u )
	unsigned u;
{
	struct itimerval old, t;
	signal( SIGALRM, usleep_back );
	t.it_interval.tv_sec = 0;
	t.it_interval.tv_usec = 0;
	t.it_value.tv_sec = u / 1000000;
	t.it_value.tv_usec = u % 1000000;
	setitimer( ITIMER_REAL, &t, &old );
	pause();
	setitimer( ITIMER_REAL, &old, NULL );
}
#endif
