Browse Source

AIO support

Suren A. Chilingaryan 11 years ago
parent
commit
5ae22aeedb
3 changed files with 214 additions and 37 deletions
  1. 16 1
      CMakeLists.txt
  2. 2 1
      config.h.in
  3. 196 35
      default.c

+ 16 - 1
CMakeLists.txt

@@ -5,6 +5,7 @@ set(FASTWRITER_ABI_VERSION "0")
 
 cmake_minimum_required(VERSION 2.8)
 
+set(DISABLE_AIO TRUE CACHE BOOL "Use kernel AIO writer")
 set(DISABLE_XFS_REALTIME FALSE CACHE BOOL "Disable support of RealTime XFS partition")
 
 
@@ -26,7 +27,16 @@ include_directories(
 add_definitions("-fPIC --std=c99 -Wall -O2 -pthread")
 
 set(HEADERS fastwriter.h sysinfo.h default.h private.h)
-add_library(fastwriter SHARED fastwriter.c sysinfo.c default.c) 
+set(SOURCES fastwriter.c sysinfo.c default.c)
+
+if (NOT DISABLE_AIO)
+    check_include_files("libaio.h" HAVE_LIBAIO_H)
+    if (NOT HAVE_LIBAIO_H)
+	message(FATAL_ERROR "error: libaio.h is not found...")
+    endif (NOT HAVE_LIBAIO_H)
+endif (NOT DISABLE_AIO)
+
+add_library(fastwriter SHARED ${SOURCES}) 
 
 set_target_properties(fastwriter PROPERTIES
     VERSION ${FASTWRITER_VERSION}
@@ -34,6 +44,11 @@ set_target_properties(fastwriter PROPERTIES
     LINK_FLAGS "-pthread"
 )
 
+if (NOT DISABLE_AIO)
+    target_link_libraries(fastwriter aio)
+endif (NOT DISABLE_AIO)
+
+
 set(TARNAME "fastwriter")
 set(PACKAGE_VERSION ${FASTWRITER_VERSION})
 set(PACKAGE_NAME "${TARNAME}")

+ 2 - 1
config.h.in

@@ -1,2 +1,3 @@
 #cmakedefine HAVE_LINUX_FALLOC_H
-#cmakedefine DISABLE_XFS_REALTIME
+#cmakedefine DISABLE_XFS_REALTIME
+#cmakedefine DISABLE_AIO

+ 196 - 35
default.c

@@ -31,6 +31,7 @@
 # include <xfs/xfs.h>
 #endif /* !DISABLE_XFS_REALTIME */
 
+
 #include "fastwriter.h"
 #include "private.h"
 #include "sysinfo.h"
@@ -40,18 +41,57 @@
 #define HAVE_FALLOCATE
 #define EXT4_WRITEBLOCK 4194304
 #define EXT4_PREALLOCATE 1073741824
+#define OCFS_WRITEBLOCK 262144
+#define AIO_QUEUE_LENGTH 4
+#define AIO_BUFFERS 8
+
 
+#ifndef DISABLE_AIO
+# include <libaio.h>
+# if AIO_QUEUE_LENGTH > AIO_BUFFERS
+#  error "AIO_QUEUE_LENGTH > AIO_BUFFERS"
+# endif
+#endif /* DISABLE_AIO */
+
+
+#ifndef DISABLE_AIO
+typedef struct {
+    size_t offset;
+    size_t size;
+    int ios;
+    int ready;			/**< 0 - unused, 1 - processing, 2 - done */
+} fastwriter_data_t;
+#endif /* !DISABLE_AIO */
 
 typedef struct {
     int fd;
 
-    int sync_mode;
+    int sync_mode;		/**< Open with O_DIRECT flag to avoid caches */
+    int aio_mode;		/**< Use kernel AIO (libaio.h) */
     
     size_t prior_size;		/**< original size of file */
     size_t preallocated;	/**< preallocated bytes */
     
     size_t wr_block;		/**< minimal block of data to write */
     size_t pa_block;		/**< preallocation setp */
+
+#ifndef DISABLE_AIO
+    io_context_t aio;
+    
+    int ios_ready_n;
+    int ios_ready[AIO_QUEUE_LENGTH];
+    struct iocb ios[AIO_QUEUE_LENGTH];
+
+    int data_head, data_tail;
+    fastwriter_data_t data[AIO_BUFFERS];
+    
+    int ios_status[AIO_QUEUE_LENGTH];
+    
+    size_t sched;		/**< how far we ahead of currently writted head */
+    size_t fd_offset;		/**< current file offset */
+
+    int page_size;
+#endif /* !DISABLE_AIO */
 } fastwriter_default_t;
 
 
@@ -89,9 +129,6 @@ int fastwriter_default_open(fastwriter_t *fw, const char *name, fastwriter_flags
 	ctx->wr_block = EXT4_WRITEBLOCK;
 	ctx->pa_block = 0;
 	ctx->prior_size = (size_t)-1;
-#ifdef SYNC_MODE
-//	ctx->sync_mode = 0;
-#endif /* SYNC_MODE */
 	open_flags &= ~(O_CREAT|O_NOATIME|O_LARGEFILE);
     } else if (!strcmp(fs, "ext4")) {
 	ctx->wr_block = EXT4_WRITEBLOCK;
@@ -102,22 +139,28 @@ int fastwriter_default_open(fastwriter_t *fw, const char *name, fastwriter_flags
     } else if (!strcmp(fs, "xfs")) {
 	ctx->wr_block = EXT4_WRITEBLOCK;
 	ctx->pa_block = EXT4_PREALLOCATE;
+    } else if (!strcmp(fs, "ocfs2")) {
+#ifndef DISABLE_AIO
+	ctx->aio_mode = 1;
+	ctx->sync_mode = 0;
+	ctx->wr_block = OCFS_WRITEBLOCK;
+#else /* !DISABLE_AIO */
+	ctx->wr_block = EXT4_WRITEBLOCK;
+#endif /* !DISABLE_AIO */
+	ctx->pa_block = EXT4_PREALLOCATE;
     } else {
 	ctx->wr_block = EXT4_WRITEBLOCK;
 	ctx->pa_block = 0;
     }
-    
-#ifdef SYNC_MODE
+
     if (ctx->sync_mode) {
 	open_flags |= O_DIRECT;
     }
-#endif /* SYNC_MODE */
 
     if (flags&FASTWRITER_FLAGS_OVERWRITE)
 	open_flags |= O_TRUNC;
 
     ctx->fd = open(name, open_flags, open_mode);
-#ifdef SYNC_MODE
     if (ctx->fd < 0) {
 	    // Running as normal user, try to disable direct mode
 	if ((errno == EINVAL)&&(ctx->sync_mode)) {
@@ -125,26 +168,23 @@ int fastwriter_default_open(fastwriter_t *fw, const char *name, fastwriter_flags
 	    open_flags &= ~O_DIRECT;
 	    ctx->fd = open(name, open_flags, open_mode);
 	}
-#endif /* SYNC_MODE */
 	if (ctx->fd < 0) return errno;
-#ifdef SYNC_MODE
     }
-#endif /* SYNC_MODE */
 
     if (((open_flags&FASTWRITER_FLAGS_OVERWRITE)==0)&&(strcmp(fs, "raw"))) {
-	ctx->prior_size = lseek(ctx->fd, 0, SEEK_END);
-# ifdef SYNC_MODE	
+	ctx->prior_size = lseek64(ctx->fd, 0, SEEK_END);
+
 	if (ctx->prior_size%FASTWRITER_SYNCIO_ALIGN) {
 	    close(ctx->fd);
 	    
 	    ctx->fd = open(name, open_flags&~O_DIRECT, open_mode);
 	    if (ctx->fd < 0) return errno;
 	    
-	    ctx->prior_size = lseek(ctx->fd, 0, SEEK_END);
+	    ctx->prior_size = lseek64(ctx->fd, 0, SEEK_END);
 	    
 	    ctx->sync_mode = 0;
+	    ctx->aio_mode = 0;
 	}
-# endif /* SYNC_MODE */
     }
 
 #ifndef DISABLE_XFS_REALTIME
@@ -153,11 +193,30 @@ int fastwriter_default_open(fastwriter_t *fw, const char *name, fastwriter_flags
 	if (!err) {
 	    attr.fsx_xflags |= XFS_XFLAG_REALTIME;
 	    err = xfsctl (name, ctx->fd, XFS_IOC_FSSETXATTR, (void *) &attr);
-//	    if (!err) puts("Real-time");
+	    if (err) fprintf(stderr, "Error initializing XFS real-time mode (%i), disabling...\n", err);
 	}
     }
 #endif /* !DISABLE_XFS_REALTIME */
 
+#ifndef DISABLE_AIO
+    if (ctx->aio_mode) {
+	int i;
+	ctx->page_size = getpagesize();
+	ctx->fd_offset = ctx->prior_size;
+
+	ctx->ios_ready_n = AIO_QUEUE_LENGTH;
+	for (i = 0; i < AIO_QUEUE_LENGTH; i++) {
+	    ctx->ios_ready[i] = i;
+	}
+	
+	err = io_queue_init(AIO_QUEUE_LENGTH, &ctx->aio);
+	if (err) {
+	    fprintf(stderr, "Error initializing AIO mode (%i), disabling...\n", -err);
+	    ctx->aio_mode = 0;
+	}
+    }
+#endif /* !DISABLE_AIO */
+
     ctx->preallocated = 0;
 
     return 0;
@@ -169,11 +228,31 @@ void fastwriter_default_close(fastwriter_t *fw) {
 	fastwriter_default_t *ctx = (fastwriter_default_t*)fw->ctx;
 
 	if (ctx->fd >= 0) {
-#if defined(SYNC_MODE)||!defined(HAVE_LINUX_FALLOC_H)
+#ifndef DISABLE_AIO
+	    if ((ctx->aio_mode)&&(ctx->aio)) {
+		int n_ev;
+		struct io_event ev[AIO_QUEUE_LENGTH];
+		
+		while (ctx->ios_ready_n < AIO_QUEUE_LENGTH) {
+		    n_ev = io_getevents(ctx->aio, 1, AIO_QUEUE_LENGTH, &ev[0], NULL);
+		    if (n_ev <= 0) {
+			fprintf(stderr, "AIO io_getevents have failed (%i)", -n_ev);
+			break;
+		    }
+		    ctx->ios_ready_n += n_ev;
+		}
+		
+		io_queue_release(ctx->aio);
+	    }
+#endif /* DISABLE_AIO */
+
+#ifdef HAVE_LINUX_FALLOC_H
 	    if (ctx->prior_size != (size_t)-1) {
+#else /* HAVE_LINUX_FALLOC_H */
+	    if ((ctx->prior_size != (size_t)-1)&&((ctx->sync_mode)||(ctx->aio_mode))) {
+#endif /* HAVE_LINUX_FALLOC_H */
 		ftruncate(ctx->fd, ctx->prior_size + fw->written);
 	    }
-#endif
 	    close(ctx->fd);
 	}
 	
@@ -194,10 +273,9 @@ int fastwriter_default_write(fastwriter_t *fw, fastwriter_write_flags_t flags, s
 	    *written = 0;
 	    return 0;
 	}
-    
+
         size -= size % ctx->wr_block;
     }
-    
 
     if ((ctx->pa_block)&&((fw->written + size) > ctx->preallocated)) {
 #ifdef HAVE_LINUX_FALLOC_H
@@ -211,27 +289,110 @@ int fastwriter_default_write(fastwriter_t *fw, fastwriter_write_flags_t flags, s
 	}
     }
 
-#ifdef SYNC_MODE
 	// we expect this to happen only at last iteration (buffer is multiply of the required align)
-    if ((ctx->sync_mode)&&(size%FASTWRITER_SYNCIO_ALIGN)) {
+    if (((ctx->aio_mode)||(ctx->sync_mode))&&(size%FASTWRITER_SYNCIO_ALIGN)) {
 	delta = FASTWRITER_SYNCIO_ALIGN - size%FASTWRITER_SYNCIO_ALIGN;
     }
-#endif /* SYNC_MODE */
     
-    do {
-	res = write(ctx->fd, data + sum, size + delta - sum);
-	if (res < 0) {
-	    *written = sum;
-	    return errno;
-	}
+#ifndef DISABLE_AIO
+    if (ctx->aio_mode) {
+	int err;
+	size_t done = 0;
+	size_t sched = 0;
+
+	fastwriter_data_t *iodata;
+	struct iocb *newio;
+	size_t wr_block = ctx->wr_block;
+
+	do {
+	    if (!ctx->ios_ready_n) {
+		int i, n_ev;
+		struct io_event ev[AIO_QUEUE_LENGTH];
+		
+		n_ev = io_getevents(ctx->aio, 1, AIO_QUEUE_LENGTH, &ev[0], NULL);
+		if (n_ev <= 0) {
+		    fprintf(stderr, "AIO io_getevents have failed (%i)", -n_ev);
+		    return -n_ev;
+		}
+		
+		for (i = 0; i < n_ev; i++) {
+		    fastwriter_data_t *ev_data = (fastwriter_data_t *)(ev[i].data);
+		    if ((ev[i].res2)||(ev[i].res < ev_data->size)) {
+			fprintf(stderr, "AIO write failed (res: %li, res2: %li, expected: %zu), no handling data will be corrupted...\n", ev[i].res, ev[i].res2, ev_data->size);
+			return -ev[i].res2;
+		    }
+
+		    ctx->ios_ready[ctx->ios_ready_n++] = ev_data->ios;
+//		    printf("Data: %i (ios %i)\n", ev_data->ready, ev_data->ios);
+		    ev_data->ready = 2;
+		}
+		
+		while (ctx->data[ctx->data_tail].ready > 1) {
+//		    printf("Done: %i %zu\n", ctx->data_tail, ctx->data[ctx->data_tail].offset);
+		    ctx->data[ctx->data_tail].ready = 0;
+
+		    done += ctx->data[ctx->data_tail].size;
+		    if ((++ctx->data_tail) == AIO_BUFFERS) ctx->data_tail = 0;
+		}
+	    }
+	    
+	    if ((ctx->sched + sched) < size) {
+		if ((ctx->data_head == ctx->data_tail)&&(ctx->data[ctx->data_head].ready)) continue;
+
+		newio = (struct iocb*)&ctx->ios[ctx->ios_ready[--ctx->ios_ready_n]];
+	        iodata = &ctx->data[ctx->data_head];
+
+		if (wr_block > ((size + delta) - (ctx->sched + sched))) {
+		    wr_block = (size + delta) - (ctx->sched + sched);
+		    if (wr_block % ctx->page_size) {
+			fprintf(stderr, "We need to write incomplete page (%zu bytes). This is no supported yet...\n", wr_block);
+			return -1;
+		    }
+		}
+		
+//		printf("Sched: %lu => %lu (%lu) [tail %lu, head %lu]\n", ctx->sched + sched, ctx->fd_offset, wr_block, ctx->data_tail, ctx->data_head);
+
+		iodata->offset = ctx->fd_offset;
+		iodata->size = wr_block;
+		iodata->ios = ctx->ios_ready_n;
+
+		io_prep_pwrite(newio, ctx->fd, data + ctx->sched + sched, wr_block, ctx->fd_offset);
+		io_set_callback(newio, (void*)iodata);
+		err = io_submit(ctx->aio, 1, &newio);
+		if (err != 1) {
+		    fprintf(stderr, "Error submiting AIO job (%i)\n", -err);
+		    return -err;
+		}
+
+		iodata->ready = 1;
+		sched += wr_block;
+		ctx->fd_offset += wr_block;
+		if ((++ctx->data_head) == AIO_BUFFERS) ctx->data_head = 0;
+	    }
+	} while (!done);
+
+	ctx->sched += sched - done;
+	size = done;
+    } else {
+#endif /* !DISABLE_AIO */
+	do {
+	    res = write(ctx->fd, data + sum, size + delta - sum);
+	    if (res < 0) {
+		*written = sum;
+		return errno;
+	    }
 	
-	sum += res;
-    } while (sum < size);
+	    sum += res;
+	} while (sum < size);
+#ifndef DISABLE_AIO
+    }
+#endif /* !DISABLE_AIO */
+
+    if ((ctx->sync_mode)||(ctx->aio_mode)) {
+	posix_fadvise(ctx->fd, fw->written, size, POSIX_FADV_DONTNEED);
+    }
 
-#ifdef SYNC_MODE    
-    posix_fadvise(ctx->fd, fw->written, size, POSIX_FADV_DONTNEED);
-#endif /* SYNC_MODE */
-    
     *written = size;
+
     return 0;
 }