Browse Source

Few synchronization and alignment related fixes

Suren A. Chilingaryan 12 years ago
parent
commit
9c14774f2b
3 changed files with 56 additions and 23 deletions
  1. 38 13
      default.c
  2. 17 10
      fastwriter.c
  3. 1 0
      private.h

+ 38 - 13
default.c

@@ -40,6 +40,8 @@
 
 typedef struct {
     int fd;
+
+    int sync_mode;
     
     size_t prior_size;		/**< original size of file */
     size_t preallocated;	/**< preallocated bytes */
@@ -56,9 +58,6 @@ int fastwriter_default_open(fastwriter_t *fw, const char *name, fastwriter_flags
     int open_flags = (O_CREAT|O_WRONLY|O_NOATIME|O_LARGEFILE);
     int open_mode = (S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
 
-#ifdef SYNC_MODE
-    open_flags |= O_DIRECT;//|O_SYNC;
-#endif /* SYNC_MODE */
     
     fastwriter_default_t *ctx;
 
@@ -72,9 +71,17 @@ int fastwriter_default_open(fastwriter_t *fw, const char *name, fastwriter_flags
 
     fw->ctx = ctx;
 
+#ifdef SYNC_MODE
+    open_flags |= O_DIRECT;
+    ctx->sync_mode = 1;
+#endif /* SYNC_MODE */
+
+    ctx->prior_size = 0;
+
     if (!strcmp(fs, "raw")) {
 	ctx->wr_block = EXT4_WRITEBLOCK;
 	ctx->pa_block = 0;
+	ctx->prior_size = (size_t)-1;
     } else if (!strcmp(fs, "ext4")) {
 	ctx->wr_block = EXT4_WRITEBLOCK;
 	ctx->pa_block = EXT4_PREALLOCATE;
@@ -95,13 +102,21 @@ int fastwriter_default_open(fastwriter_t *fw, const char *name, fastwriter_flags
     ctx->fd = open(name, open_flags, open_mode);
     if (ctx->fd < 0) return errno;
 
-    ctx->prior_size = 0;
-    
-#ifndef HAVE_LINUX_FALLOC_H
     if (((open_flags&FASTWRITER_FLAGS_OVERWRITE)==0)&&(strcmp(fs, "raw"))) {
 	ctx->prior_size = lseek(ctx->fd, 0, SEEK_END);
+# ifdef SYNC_MODE	
+	if (ctx->prior_size%FASTWRITER_SYNCIO_ALIGN) {
+	    close(ctx->fd);
+	    
+	    ctx->fd = open(name, open_flags&~O_DIRECT, open_mode);
+	    if (ctx->fd < 0) return errno;
+	    
+	    ctx->prior_size = lseek(ctx->fd, 0, SEEK_END);
+	    
+	    ctx->sync_mode = 0;
+	}
+# endif /* SYNC_MODE */
     }
-#endif /* HAVE_LINUX_FALLOC_H */
 
     ctx->preallocated = 0;
 
@@ -114,11 +129,11 @@ void fastwriter_default_close(fastwriter_t *fw) {
 	fastwriter_default_t *ctx = (fastwriter_default_t*)fw->ctx;
 
 	if (ctx->fd >= 0) {
-#ifndef HAVE_LINUX_FALLOC_H
-	    if (ctx->prior_size) {
-		ftrucate(ctx->fd, ctx->prior_size + fw->written);
+#if defined(SYNC_MODE)||!defined(HAVE_LINUX_FALLOC_H)
+	    if (ctx->prior_size != (size_t)-1) {
+		ftruncate(ctx->fd, ctx->prior_size + fw->written);
 	    }
-#endif /* HAVE_LINUX_FALLOC_H */
+#endif
 	    close(ctx->fd);
 	}
 	
@@ -130,9 +145,10 @@ void fastwriter_default_close(fastwriter_t *fw) {
 
 int fastwriter_default_write(fastwriter_t *fw, fastwriter_write_flags_t flags, size_t size, void *data, size_t *written) {
     size_t sum = 0;
+    size_t delta = 0;
     ssize_t res;
     fastwriter_default_t *ctx = (fastwriter_default_t*)fw->ctx;
-    
+
     if ((flags&FASTWRITER_WRITE_FLAG_FORCE)==0) {
 	if (size < ctx->wr_block) {
 	    *written = 0;
@@ -141,6 +157,7 @@ int fastwriter_default_write(fastwriter_t *fw, fastwriter_write_flags_t flags, s
     
         size -= size % ctx->wr_block;
     }
+    
 
     if ((ctx->pa_block)&&((fw->written + size) > ctx->preallocated)) {
 #ifdef HAVE_LINUX_FALLOC_H
@@ -153,9 +170,17 @@ int fastwriter_default_write(fastwriter_t *fw, fastwriter_write_flags_t flags, s
 	    ctx->preallocated += ctx->pa_block;
 	}
     }
+
+#ifdef SYNC_MODE
+	// we expect this to happen only at last iteration (buffer is multiply of the required align)
+    if ((ctx->sync_mode)&&(size%FASTWRITER_SYNCIO_ALIGN)) {
+	delta = FASTWRITER_SYNCIO_ALIGN - size%FASTWRITER_SYNCIO_ALIGN;
+    }
+#endif /* SYNC_MODE */
     
     do {
-	res = write(ctx->fd, data, size);
+	res = write(ctx->fd, data + sum, size + delta - sum);
+//	printf("%i %i %p %zu %i\n", res, ctx->fd, data, size, delta);
 	if (res < 0) {
 	    *written = sum;
 	    return errno;

+ 17 - 10
fastwriter.c

@@ -68,9 +68,12 @@ int fastwriter_open(fastwriter_t *ctx, const char *name, fastwriter_flags_t flag
      default:
 	ctx->size = ctx->params.buffer_size;
     }
-    
-    ctx->buffer = malloc(ctx->size);
-    if (!ctx->buffer) {
+
+    if (ctx->size%FASTWRITER_SYNCIO_ALIGN)
+	ctx->size += FASTWRITER_SYNCIO_ALIGN - (ctx->size%FASTWRITER_SYNCIO_ALIGN);
+
+    err = posix_memalign(&ctx->buffer, FASTWRITER_SYNCIO_ALIGN, ctx->size);
+    if ((err)||(!ctx->buffer)) {
 	fastwriter_close(ctx);
 	return ENOMEM;
     }
@@ -149,7 +152,7 @@ int fastwriter_close(fastwriter_t *ctx) {
 	ctx->buffer = NULL;
     }
     
-    return 0;
+    return ctx->err;
     
 }
 
@@ -177,6 +180,7 @@ static void *fastwriter_writer_thread(void *user) {
 
     fastwriter_t *ctx = (fastwriter_t*)user;
 
+
     while ((ctx->run_flag)||(ctx->head != ctx->tail)) {
 	if (ctx->head != ctx->tail) {
 	    head = ctx->head;
@@ -219,12 +223,14 @@ static void *fastwriter_writer_thread(void *user) {
 		while ((ctx->run_flag)&&(ctx->head == head)) {
 		    pthread_cond_wait(&ctx->data_cond, &ctx->data_cond_mutex);
 		}
+		pthread_mutex_unlock(&ctx->data_cond_mutex);
 	    }
 	} else {
 	    pthread_mutex_lock(&ctx->data_cond_mutex);
 	    while ((ctx->run_flag)&&(ctx->head == ctx->tail)) {
 		pthread_cond_wait(&ctx->data_cond, &ctx->data_cond_mutex);
 	    }
+	    pthread_mutex_unlock(&ctx->data_cond_mutex);
 	}
     }
     
@@ -251,19 +257,19 @@ int fastwriter_push(fastwriter_t *ctx, size_t size, const void *data) {
 	end = ctx->size - (free - size);
 	if (end > ctx->max_usage) ctx->max_usage = end;
     }
-    
+
     if (!ctx->run_flag) {
 	if (ctx->err) return ctx->err;
 	return EBADFD;
     }
-    
+
     if (ctx->pos < ctx->tail) end = ctx->tail;
     else end = ctx->size;
-    
+
 
     part1 = end - ctx->pos;
-    
-    if (part1 > size) {
+
+    if (part1 < size) {
 	    // tail < pos (we have checked for free space)
 	end = size - part1;
 	memcpy(ctx->buffer + ctx->pos, data, part1);
@@ -307,9 +313,10 @@ int fastwriter_cancel(fastwriter_t *ctx) {
 
 int fastwriter_push_data(fastwriter_t *ctx, size_t size, const void *buf) {
     int err;
+
     err = fastwriter_push(ctx, size, buf);
     if (err) return err;
-    
+
     err = fastwriter_commit(ctx);
     if (err) fastwriter_cancel(ctx);
 

+ 1 - 0
private.h

@@ -1,6 +1,7 @@
 #ifndef _FASTWRITER_PRIVATE_H
 #define _FASTWRITER_PRIVATE_H
 
+#define FASTWRITER_SYNCIO_ALIGN 512
 #define FASTWRITER_DEFAULT_BUFFER_SIZE 134217728 /* 128 MB */
 #define FASTWRITER_RESERVE_MEMORY 536870912 /* 512 MB */