#define _GNU_SOURCE #include #include #include #include #include #include #include #include #include #include #include #include "private.h" #include "default.h" #include "sysinfo.h" #ifdef USE_CUSTOM_MEMCPY # include "memcpy.h" #else /* USE_CUSTOM_MEMCPY */ # define fast_memcpy memcpy #endif /* USE_CUSTOM_MEMCPY */ fastwriter_t *fastwriter_init(const char *fs, fastwriter_flags_t flags) { fastwriter_t *ctx; ctx = (fastwriter_t*)malloc(sizeof(fastwriter_t)); if (!ctx) return ctx; memset(ctx, 0, sizeof(fastwriter_t)); ctx->params.flags = flags; ctx->api = &fastwriter_default_api; return ctx; } void fastwriter_destroy(fastwriter_t *ctx) { free(ctx); } int fastwriter_set_buffer_size(fastwriter_t *ctx, size_t buffer_size) { ctx->params.buffer_size = buffer_size; return 0; } static void *fastwriter_writer_thread(void *user); int fastwriter_open(fastwriter_t *ctx, const char *name, fastwriter_flags_t flags) { int i; int err; int e[4]; ctx->flags = flags | ctx->params.flags; switch (ctx->params.buffer_size) { case FASTWRITER_BUFFER_DEFAULT: ctx->size = FASTWRITER_DEFAULT_BUFFER_SIZE; break; case FASTWRITER_BUFFER_MAX: ctx->size = fastwriter_get_free_memory(); if ((ctx->size == (size_t)-1)||((ctx->size - FASTWRITER_RESERVE_MEMORY) < FASTWRITER_DEFAULT_BUFFER_SIZE)) ctx->size = FASTWRITER_DEFAULT_BUFFER_SIZE; else ctx->size -= FASTWRITER_RESERVE_MEMORY; break; default: ctx->size = ctx->params.buffer_size; } if (ctx->size%FASTWRITER_SYNCIO_ALIGN) ctx->size += FASTWRITER_SYNCIO_ALIGN - (ctx->size%FASTWRITER_SYNCIO_ALIGN); err = posix_memalign(&ctx->buffer, FASTWRITER_SYNCIO_ALIGN, ctx->size); if ((err)||(!ctx->buffer)) { fastwriter_close(ctx); return ENOMEM; } ctx->err = 0; ctx->written = 0; ctx->commited = 0; ctx->chunked = 0; ctx->tail = 0; ctx->head = 0; ctx->pos = 0; err = ctx->api->open(ctx, name, ctx->flags); if (err) { fastwriter_close(ctx); return err; } e[0] = pthread_mutex_init(&ctx->data_cond_mutex, NULL); e[1] = pthread_mutex_init(&ctx->space_cond_mutex, NULL); e[2] = pthread_cond_init(&ctx->data_cond, NULL); e[3] = pthread_cond_init(&ctx->space_cond, NULL); if (e[0]|e[1]|e[2]|e[3]) { if (!e[3]) pthread_cond_destroy(&ctx->space_cond); if (!e[2]) pthread_cond_destroy(&ctx->data_cond); if (!e[1]) pthread_mutex_destroy(&ctx->space_cond_mutex); if (!e[0]) pthread_mutex_destroy(&ctx->data_cond_mutex); fastwriter_close(ctx); for (i = 0; i < 4; i++) if (e[i]) return e[i]; } ctx->clean_locks = 1; ctx->run_flag = 1; err = pthread_create(&ctx->wthread, NULL, &fastwriter_writer_thread, ctx); if (err) { ctx->run_flag = 0; fastwriter_close(ctx); return err; } return 0; } int fastwriter_close(fastwriter_t *ctx) { if ((!ctx->err)&&(ctx->pos != ctx->head)) return EBADFD; if (ctx->run_flag) { ctx->run_flag = 0; pthread_mutex_lock(&ctx->data_cond_mutex); pthread_cond_broadcast(&ctx->data_cond); pthread_mutex_unlock(&ctx->data_cond_mutex); pthread_join(ctx->wthread, NULL); } if (ctx->clean_locks) { pthread_cond_destroy(&ctx->space_cond); pthread_cond_destroy(&ctx->data_cond); pthread_mutex_destroy(&ctx->space_cond_mutex); pthread_mutex_destroy(&ctx->data_cond_mutex); ctx->clean_locks = 0; } ctx->api->close(ctx); if (ctx->buffer) { free(ctx->buffer); ctx->buffer = NULL; } return ctx->err; } static inline size_t fastwriter_compute_free_space(fastwriter_t *ctx) { if (ctx->pos < ctx->tail) return ctx->tail - ctx->pos; return ctx->tail + ctx->size - ctx->pos - 1; } int fastwriter_get_stats(fastwriter_t *ctx, fastwriter_stats_t *stats) { stats->buffer_size = ctx->size; stats->buffer_used = ctx->size - fastwriter_compute_free_space(ctx); stats->buffer_max = ctx->max_usage; stats->commited = ctx->commited; stats->written = ctx->written; return 0; } static void *fastwriter_writer_thread(void *user) { int err = 0; fastwriter_write_flags_t flags; size_t size; size_t head; fastwriter_t *ctx = (fastwriter_t*)user; while ((ctx->run_flag)||(ctx->head != ctx->tail)) { if (ctx->head != ctx->tail) { head = ctx->head; if (head > ctx->tail) { size = head - ctx->tail; flags = FASTWRITER_WRITE_FLAGS_DEFAULT; } else { size = ctx->size - ctx->tail; flags = FASTWRITER_WRITE_FLAG_FORCE; } if (!ctx->run_flag) flags |= FASTWRITER_WRITE_FLAG_FORCE; err = ctx->api->write(ctx, flags, size, ctx->buffer + ctx->tail, &size); if (err) { ctx->err = err; ctx->run_flag = 0; pthread_mutex_lock(&ctx->space_cond_mutex); pthread_cond_broadcast(&ctx->space_cond); pthread_mutex_unlock(&ctx->space_cond_mutex); return NULL; } if (size > 0) { ctx->written += size; size += ctx->tail; if (size == ctx->size) ctx->tail = 0; else ctx->tail = size; pthread_mutex_lock(&ctx->space_cond_mutex); pthread_cond_broadcast(&ctx->space_cond); pthread_mutex_unlock(&ctx->space_cond_mutex); } else { pthread_mutex_lock(&ctx->data_cond_mutex); while ((ctx->run_flag)&&(ctx->head == head)) { pthread_cond_wait(&ctx->data_cond, &ctx->data_cond_mutex); } pthread_mutex_unlock(&ctx->data_cond_mutex); } } else { pthread_mutex_lock(&ctx->data_cond_mutex); while ((ctx->run_flag)&&(ctx->head == ctx->tail)) { pthread_cond_wait(&ctx->data_cond, &ctx->data_cond_mutex); } pthread_mutex_unlock(&ctx->data_cond_mutex); } } return NULL; } int fastwriter_push(fastwriter_t *ctx, size_t size, const void *data) { size_t part1, end; size_t free = fastwriter_compute_free_space(ctx); if (free < size) { ctx->max_usage = ctx->size; if (size > ctx->size) { return EOVERFLOW; } if ((ctx->flags&FASTWRITER_FLAGS_BLOCK)==0) return EWOULDBLOCK; pthread_mutex_lock(&ctx->space_cond_mutex); while ((ctx->run_flag)&&(fastwriter_compute_free_space(ctx) < size)) { pthread_cond_wait(&ctx->space_cond, &ctx->space_cond_mutex); } pthread_mutex_unlock(&ctx->space_cond_mutex); } else { end = ctx->size - (free - size); if (end > ctx->max_usage) ctx->max_usage = end; } if (!ctx->run_flag) { if (ctx->err) return ctx->err; return EBADFD; } if (ctx->pos < ctx->tail) end = ctx->tail; else end = ctx->size; part1 = end - ctx->pos; if (part1 < size) { // tail < pos (we have checked for free space) end = size - part1; fast_memcpy(ctx->buffer + ctx->pos, data, part1); fast_memcpy(ctx->buffer, data + part1, end); ctx->pos = end; } else { fast_memcpy(ctx->buffer + ctx->pos, data, size); ctx->pos += size; if (ctx->pos == ctx->size) ctx->pos = 0; } ctx->chunked += size; return 0; } int fastwriter_commit(fastwriter_t *ctx) { ctx->head = ctx->pos; pthread_mutex_lock(&ctx->data_cond_mutex); pthread_cond_broadcast(&ctx->data_cond); pthread_mutex_unlock(&ctx->data_cond_mutex); ctx->commited += ctx->chunked; ctx->chunked = 0; return 0; } int fastwriter_cancel(fastwriter_t *ctx) { ctx->pos = ctx->head; ctx->chunked = 0; return 0; } int fastwriter_push_data(fastwriter_t *ctx, size_t size, const void *buf) { int err; err = fastwriter_push(ctx, size, buf); if (err) return err; err = fastwriter_commit(ctx); if (err) fastwriter_cancel(ctx); return err; }