#define _FASTWRITER_DEFAULT_C #define _GNU_SOURCE #define _XOPEN_SOURCE 600 #define _POSIX_C_SOURCE 200112L #define _LARGEFILE64_SOURCE #include "config.h" #include #include #include #include #include #include #include #include #include #include #include #ifdef HAVE_LINUX_FALLOC_H # include #endif /* HAVE_LINUX_FALLOC_H */ #ifndef DISABLE_XFS_REALTIME # include #endif /* !DISABLE_XFS_REALTIME */ #include "fastwriter.h" #include "private.h" #include "sysinfo.h" #include "default.h" #define SYNC_MODE #define HAVE_FALLOCATE #define EXT4_WRITEBLOCK 4194304 #define EXT4_PREALLOCATE 1073741824 #define OCFS_WRITEBLOCK 262144 #define AIO_QUEUE_LENGTH 4 #define AIO_BUFFERS 8 #ifndef DISABLE_AIO # include # if AIO_QUEUE_LENGTH > AIO_BUFFERS # error "AIO_QUEUE_LENGTH > AIO_BUFFERS" # endif #endif /* DISABLE_AIO */ #ifndef DISABLE_AIO typedef struct { size_t offset; size_t size; int ios; int ready; /**< 0 - unused, 1 - processing, 2 - done */ } fastwriter_data_t; #endif /* !DISABLE_AIO */ typedef struct { int fd; int sync_mode; /**< Open with O_DIRECT flag to avoid caches */ int aio_mode; /**< Use kernel AIO (libaio.h) */ size_t prior_size; /**< original size of file */ size_t preallocated; /**< preallocated bytes */ size_t wr_block; /**< minimal block of data to write */ size_t pa_block; /**< preallocation setp */ #ifndef DISABLE_AIO io_context_t aio; int ios_ready_n; int ios_ready[AIO_QUEUE_LENGTH]; struct iocb ios[AIO_QUEUE_LENGTH]; int data_head, data_tail; fastwriter_data_t data[AIO_BUFFERS]; int ios_status[AIO_QUEUE_LENGTH]; size_t sched; /**< how far we ahead of currently writted head */ size_t fd_offset; /**< current file offset */ int page_size; #endif /* !DISABLE_AIO */ } fastwriter_default_t; int fastwriter_default_open(fastwriter_t *fw, const char *name, fastwriter_flags_t flags) { int err; char fs[16]; #ifndef DISABLE_XFS_REALTIME struct fsxattr attr; #endif /* !DISABLE_XFS_REALTIME */ int open_flags = (O_CREAT|O_WRONLY|O_NOATIME|O_LARGEFILE); int open_mode = (S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); fastwriter_default_t *ctx; err = fastwriter_get_file_fs(name, sizeof(fs) - 1, fs); if (err) return err; ctx = (fastwriter_default_t*)malloc(sizeof(fastwriter_default_t)); if (!ctx) return ENOMEM; memset(ctx, 0, sizeof(fastwriter_default_t)); fw->ctx = ctx; #ifdef SYNC_MODE ctx->sync_mode = 1; #endif /* SYNC_MODE */ ctx->prior_size = 0; if (!strcmp(fs, "raw")) { ctx->wr_block = EXT4_WRITEBLOCK; ctx->pa_block = 0; ctx->prior_size = (size_t)-1; open_flags &= ~(O_CREAT|O_NOATIME|O_LARGEFILE); } else if (!strcmp(fs, "ext4")) { ctx->wr_block = EXT4_WRITEBLOCK; ctx->pa_block = EXT4_PREALLOCATE; } else if (!strcmp(fs, "btrfs")) { ctx->wr_block = EXT4_WRITEBLOCK; ctx->pa_block = EXT4_PREALLOCATE; } else if (!strcmp(fs, "xfs")) { ctx->wr_block = EXT4_WRITEBLOCK; ctx->pa_block = EXT4_PREALLOCATE; } else if (!strcmp(fs, "ocfs2")) { #ifndef DISABLE_AIO ctx->aio_mode = 1; ctx->sync_mode = 0; ctx->wr_block = OCFS_WRITEBLOCK; #else /* !DISABLE_AIO */ ctx->wr_block = EXT4_WRITEBLOCK; #endif /* !DISABLE_AIO */ ctx->pa_block = EXT4_PREALLOCATE; /* } else if (!strcmp(fs, "fhgfs")) { ctx->sync_mode = 0; ctx->wr_block = OCFS_WRITEBLOCK; ctx->pa_block = EXT4_PREALLOCATE; } else if (strstr(fs, "gluster")) { ctx->sync_mode = 0; ctx->wr_block = OCFS_WRITEBLOCK; ctx->pa_block = EXT4_PREALLOCATE;*/ } else { ctx->sync_mode = 0; ctx->wr_block = OCFS_WRITEBLOCK; ctx->pa_block = 0; } if (ctx->sync_mode) { open_flags |= O_DIRECT; } if (flags&FASTWRITER_FLAGS_OVERWRITE) open_flags |= O_TRUNC; ctx->fd = open(name, open_flags, open_mode); if (ctx->fd < 0) { // Running as normal user, try to disable direct mode if ((errno == EINVAL)&&(ctx->sync_mode)) { ctx->sync_mode = 0; open_flags &= ~O_DIRECT; ctx->fd = open(name, open_flags, open_mode); } if (ctx->fd < 0) return errno; } if (((open_flags&FASTWRITER_FLAGS_OVERWRITE)==0)&&(strcmp(fs, "raw"))) { ctx->prior_size = lseek64(ctx->fd, 0, SEEK_END); if (ctx->prior_size%FASTWRITER_SYNCIO_ALIGN) { close(ctx->fd); ctx->fd = open(name, open_flags&~O_DIRECT, open_mode); if (ctx->fd < 0) return errno; ctx->prior_size = lseek64(ctx->fd, 0, SEEK_END); ctx->sync_mode = 0; ctx->aio_mode = 0; } } #ifndef DISABLE_XFS_REALTIME if (!strcmp(fs, "xfs")) { err = xfsctl (name, ctx->fd, XFS_IOC_FSGETXATTR, (void *) &attr); if (!err) { attr.fsx_xflags |= XFS_XFLAG_REALTIME; err = xfsctl (name, ctx->fd, XFS_IOC_FSSETXATTR, (void *) &attr); if (err) fprintf(stderr, "Error initializing XFS real-time mode (%i), disabling...\n", err); } } #endif /* !DISABLE_XFS_REALTIME */ #ifndef DISABLE_AIO if (ctx->aio_mode) { int i; ctx->page_size = getpagesize(); ctx->fd_offset = ctx->prior_size; ctx->ios_ready_n = AIO_QUEUE_LENGTH; for (i = 0; i < AIO_QUEUE_LENGTH; i++) { ctx->ios_ready[i] = i; } err = io_queue_init(AIO_QUEUE_LENGTH, &ctx->aio); if (err) { fprintf(stderr, "Error initializing AIO mode (%i), disabling...\n", -err); ctx->aio_mode = 0; } } #endif /* !DISABLE_AIO */ ctx->preallocated = 0; return 0; } void fastwriter_default_close(fastwriter_t *fw) { if (fw->ctx) { fastwriter_default_t *ctx = (fastwriter_default_t*)fw->ctx; if (ctx->fd >= 0) { #ifndef DISABLE_AIO if ((ctx->aio_mode)&&(ctx->aio)) { int n_ev; struct io_event ev[AIO_QUEUE_LENGTH]; while (ctx->ios_ready_n < AIO_QUEUE_LENGTH) { n_ev = io_getevents(ctx->aio, 1, AIO_QUEUE_LENGTH, &ev[0], NULL); if (n_ev <= 0) { fprintf(stderr, "AIO io_getevents have failed (%i)", -n_ev); break; } ctx->ios_ready_n += n_ev; } io_queue_release(ctx->aio); } #endif /* DISABLE_AIO */ #ifdef HAVE_LINUX_FALLOC_H if (ctx->prior_size != (size_t)-1) { #else /* HAVE_LINUX_FALLOC_H */ if ((ctx->prior_size != (size_t)-1)&&((ctx->sync_mode)||(ctx->aio_mode))) { #endif /* HAVE_LINUX_FALLOC_H */ ftruncate(ctx->fd, ctx->prior_size + fw->written); } close(ctx->fd); } free(ctx); fw->ctx = NULL; } } int fastwriter_default_write(fastwriter_t *fw, fastwriter_write_flags_t flags, size_t size, void *data, size_t *written) { size_t sum = 0; size_t delta = 0; ssize_t res; fastwriter_default_t *ctx = (fastwriter_default_t*)fw->ctx; if ((flags&FASTWRITER_WRITE_FLAG_FORCE)==0) { if (size < ctx->wr_block) { *written = 0; return 0; } size -= size % ctx->wr_block; } if ((ctx->pa_block)&&((fw->written + size) > ctx->preallocated)) { #ifdef HAVE_LINUX_FALLOC_H if (fallocate(ctx->fd, FALLOC_FL_KEEP_SIZE, ctx->preallocated, ctx->pa_block)) { #else /* HAVE_LINUX_FALLOC_H */ if (posix_fallocate(ctx->fd, ctx->preallocated, ctx->pa_block)) { #endif /* HAVE_LINUX_FALLOC_H */ ctx->pa_block = 0; } else { ctx->preallocated += ctx->pa_block; } } // we expect this to happen only at last iteration (buffer is multiply of the required align) if (((ctx->aio_mode)||(ctx->sync_mode))&&(size%FASTWRITER_SYNCIO_ALIGN)) { delta = FASTWRITER_SYNCIO_ALIGN - size%FASTWRITER_SYNCIO_ALIGN; } #ifndef DISABLE_AIO if (ctx->aio_mode) { int err; size_t done = 0; size_t sched = 0; fastwriter_data_t *iodata; struct iocb *newio; size_t wr_block = ctx->wr_block; do { if (!ctx->ios_ready_n) { int i, n_ev; struct io_event ev[AIO_QUEUE_LENGTH]; n_ev = io_getevents(ctx->aio, 1, AIO_QUEUE_LENGTH, &ev[0], NULL); if (n_ev <= 0) { fprintf(stderr, "AIO io_getevents have failed (%i)", -n_ev); return -n_ev; } for (i = 0; i < n_ev; i++) { fastwriter_data_t *ev_data = (fastwriter_data_t *)(ev[i].data); if ((ev[i].res2)||(ev[i].res < ev_data->size)) { fprintf(stderr, "AIO write failed (res: %li, res2: %li, expected: %zu), no handling data will be corrupted...\n", ev[i].res, ev[i].res2, ev_data->size); return -ev[i].res2; } ctx->ios_ready[ctx->ios_ready_n++] = ev_data->ios; // printf("Data: %i (ios %i)\n", ev_data->ready, ev_data->ios); ev_data->ready = 2; } while (ctx->data[ctx->data_tail].ready > 1) { // printf("Done: %i %zu\n", ctx->data_tail, ctx->data[ctx->data_tail].offset); ctx->data[ctx->data_tail].ready = 0; done += ctx->data[ctx->data_tail].size; if ((++ctx->data_tail) == AIO_BUFFERS) ctx->data_tail = 0; } } if ((ctx->sched + sched) < size) { if ((ctx->data_head == ctx->data_tail)&&(ctx->data[ctx->data_head].ready)) continue; newio = (struct iocb*)&ctx->ios[ctx->ios_ready[--ctx->ios_ready_n]]; iodata = &ctx->data[ctx->data_head]; if (wr_block > ((size + delta) - (ctx->sched + sched))) { wr_block = (size + delta) - (ctx->sched + sched); if (wr_block % ctx->page_size) { fprintf(stderr, "We need to write incomplete page (%zu bytes). This is no supported yet...\n", wr_block); return -1; } } // printf("Sched: %lu => %lu (%lu) [tail %lu, head %lu]\n", ctx->sched + sched, ctx->fd_offset, wr_block, ctx->data_tail, ctx->data_head); iodata->offset = ctx->fd_offset; iodata->size = wr_block; iodata->ios = ctx->ios_ready_n; io_prep_pwrite(newio, ctx->fd, data + ctx->sched + sched, wr_block, ctx->fd_offset); io_set_callback(newio, (void*)iodata); err = io_submit(ctx->aio, 1, &newio); if (err != 1) { fprintf(stderr, "Error submiting AIO job (%i)\n", -err); return -err; } iodata->ready = 1; sched += wr_block; ctx->fd_offset += wr_block; if ((++ctx->data_head) == AIO_BUFFERS) ctx->data_head = 0; } } while (!done); ctx->sched += sched - done; size = done; } else { #endif /* !DISABLE_AIO */ do { res = write(ctx->fd, data + sum, size + delta - sum); if (res < 0) { *written = sum; return errno; } sum += res; } while (sum < size); #ifndef DISABLE_AIO } #endif /* !DISABLE_AIO */ if ((ctx->sync_mode)||(ctx->aio_mode)) { posix_fadvise(ctx->fd, fw->written, size, POSIX_FADV_DONTNEED); } *written = size; return 0; }