default.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. #define _FASTWRITER_DEFAULT_C
  2. #define _GNU_SOURCE
  3. #define _XOPEN_SOURCE 600
  4. #define _POSIX_C_SOURCE 200112L
  5. #define _LARGEFILE64_SOURCE
  6. #include "config.h"
  7. #include <stdio.h>
  8. #include <stdlib.h>
  9. #include <string.h>
  10. #include <unistd.h>
  11. #include <limits.h>
  12. #include <errno.h>
  13. #include <pthread.h>
  14. #include <sys/types.h>
  15. #include <sys/stat.h>
  16. #include <sys/time.h>
  17. #include <fcntl.h>
  18. #ifdef HAVE_LINUX_FALLOC_H
  19. # include <linux/falloc.h>
  20. #endif /* HAVE_LINUX_FALLOC_H */
  21. #ifndef DISABLE_XFS_REALTIME
  22. # include <xfs/xfs.h>
  23. #endif /* !DISABLE_XFS_REALTIME */
  24. #include "fastwriter.h"
  25. #include "private.h"
  26. #include "sysinfo.h"
  27. #include "default.h"
  28. #define SYNC_MODE
  29. #define HAVE_FALLOCATE
  30. #define EXT4_WRITEBLOCK 4194304
  31. #define EXT4_PREALLOCATE 1073741824
  32. #define OCFS_WRITEBLOCK 262144
  33. #define AIO_QUEUE_LENGTH 4
  34. #define AIO_BUFFERS 8
  35. #ifndef DISABLE_AIO
  36. # include <libaio.h>
  37. # if AIO_QUEUE_LENGTH > AIO_BUFFERS
  38. # error "AIO_QUEUE_LENGTH > AIO_BUFFERS"
  39. # endif
  40. #endif /* DISABLE_AIO */
  41. #ifndef DISABLE_AIO
  42. typedef struct {
  43. size_t offset;
  44. size_t size;
  45. int ios;
  46. int ready; /**< 0 - unused, 1 - processing, 2 - done */
  47. } fastwriter_data_t;
  48. #endif /* !DISABLE_AIO */
  49. typedef struct {
  50. int fd;
  51. int sync_mode; /**< Open with O_DIRECT flag to avoid caches */
  52. int aio_mode; /**< Use kernel AIO (libaio.h) */
  53. size_t prior_size; /**< original size of file */
  54. size_t preallocated; /**< preallocated bytes */
  55. size_t wr_block; /**< minimal block of data to write */
  56. size_t pa_block; /**< preallocation setp */
  57. #ifndef DISABLE_AIO
  58. io_context_t aio;
  59. int ios_ready_n;
  60. int ios_ready[AIO_QUEUE_LENGTH];
  61. struct iocb ios[AIO_QUEUE_LENGTH];
  62. int data_head, data_tail;
  63. fastwriter_data_t data[AIO_BUFFERS];
  64. int ios_status[AIO_QUEUE_LENGTH];
  65. size_t sched; /**< how far we ahead of currently writted head */
  66. size_t fd_offset; /**< current file offset */
  67. int page_size;
  68. #endif /* !DISABLE_AIO */
  69. } fastwriter_default_t;
  70. int fastwriter_default_open(fastwriter_t *fw, const char *name, fastwriter_flags_t flags) {
  71. int err;
  72. char fs[16];
  73. #ifndef DISABLE_XFS_REALTIME
  74. struct fsxattr attr;
  75. #endif /* !DISABLE_XFS_REALTIME */
  76. int open_flags = (O_CREAT|O_WRONLY|O_NOATIME|O_LARGEFILE);
  77. int open_mode = (S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
  78. fastwriter_default_t *ctx;
  79. err = fastwriter_get_file_fs(name, sizeof(fs) - 1, fs);
  80. if (err) return err;
  81. ctx = (fastwriter_default_t*)malloc(sizeof(fastwriter_default_t));
  82. if (!ctx) return ENOMEM;
  83. memset(ctx, 0, sizeof(fastwriter_default_t));
  84. fw->ctx = ctx;
  85. #ifdef SYNC_MODE
  86. ctx->sync_mode = 1;
  87. #endif /* SYNC_MODE */
  88. ctx->prior_size = 0;
  89. if (!strcmp(fs, "raw")) {
  90. ctx->wr_block = EXT4_WRITEBLOCK;
  91. ctx->pa_block = 0;
  92. ctx->prior_size = (size_t)-1;
  93. open_flags &= ~(O_CREAT|O_NOATIME|O_LARGEFILE);
  94. } else if (!strcmp(fs, "ext4")) {
  95. ctx->wr_block = EXT4_WRITEBLOCK;
  96. ctx->pa_block = EXT4_PREALLOCATE;
  97. } else if (!strcmp(fs, "btrfs")) {
  98. ctx->wr_block = EXT4_WRITEBLOCK;
  99. ctx->pa_block = EXT4_PREALLOCATE;
  100. } else if (!strcmp(fs, "xfs")) {
  101. ctx->wr_block = EXT4_WRITEBLOCK;
  102. ctx->pa_block = EXT4_PREALLOCATE;
  103. } else if (!strcmp(fs, "ocfs2")) {
  104. #ifndef DISABLE_AIO
  105. ctx->aio_mode = 1;
  106. ctx->sync_mode = 0;
  107. ctx->wr_block = OCFS_WRITEBLOCK;
  108. #else /* !DISABLE_AIO */
  109. ctx->wr_block = EXT4_WRITEBLOCK;
  110. #endif /* !DISABLE_AIO */
  111. ctx->pa_block = EXT4_PREALLOCATE;
  112. /* } else if (!strcmp(fs, "fhgfs")) {
  113. ctx->sync_mode = 0;
  114. ctx->wr_block = OCFS_WRITEBLOCK;
  115. ctx->pa_block = EXT4_PREALLOCATE;
  116. } else if (strstr(fs, "gluster")) {
  117. ctx->sync_mode = 0;
  118. ctx->wr_block = OCFS_WRITEBLOCK;
  119. ctx->pa_block = EXT4_PREALLOCATE;*/
  120. } else {
  121. ctx->sync_mode = 0;
  122. ctx->wr_block = OCFS_WRITEBLOCK;
  123. ctx->pa_block = 0;
  124. }
  125. if (ctx->sync_mode) {
  126. open_flags |= O_DIRECT;
  127. }
  128. if (flags&FASTWRITER_FLAGS_OVERWRITE)
  129. open_flags |= O_TRUNC;
  130. ctx->fd = open(name, open_flags, open_mode);
  131. if (ctx->fd < 0) {
  132. // Running as normal user, try to disable direct mode
  133. if ((errno == EINVAL)&&(ctx->sync_mode)) {
  134. ctx->sync_mode = 0;
  135. open_flags &= ~O_DIRECT;
  136. ctx->fd = open(name, open_flags, open_mode);
  137. }
  138. if (ctx->fd < 0) return errno;
  139. }
  140. if (((open_flags&FASTWRITER_FLAGS_OVERWRITE)==0)&&(strcmp(fs, "raw"))) {
  141. ctx->prior_size = lseek64(ctx->fd, 0, SEEK_END);
  142. if (ctx->prior_size%FASTWRITER_SYNCIO_ALIGN) {
  143. close(ctx->fd);
  144. ctx->fd = open(name, open_flags&~O_DIRECT, open_mode);
  145. if (ctx->fd < 0) return errno;
  146. ctx->prior_size = lseek64(ctx->fd, 0, SEEK_END);
  147. ctx->sync_mode = 0;
  148. ctx->aio_mode = 0;
  149. }
  150. }
  151. #ifndef DISABLE_XFS_REALTIME
  152. if (!strcmp(fs, "xfs")) {
  153. err = xfsctl (name, ctx->fd, XFS_IOC_FSGETXATTR, (void *) &attr);
  154. if (!err) {
  155. attr.fsx_xflags |= XFS_XFLAG_REALTIME;
  156. err = xfsctl (name, ctx->fd, XFS_IOC_FSSETXATTR, (void *) &attr);
  157. if (err) fprintf(stderr, "Error initializing XFS real-time mode (%i), disabling...\n", err);
  158. }
  159. }
  160. #endif /* !DISABLE_XFS_REALTIME */
  161. #ifndef DISABLE_AIO
  162. if (ctx->aio_mode) {
  163. int i;
  164. ctx->page_size = getpagesize();
  165. ctx->fd_offset = ctx->prior_size;
  166. ctx->ios_ready_n = AIO_QUEUE_LENGTH;
  167. for (i = 0; i < AIO_QUEUE_LENGTH; i++) {
  168. ctx->ios_ready[i] = i;
  169. }
  170. err = io_queue_init(AIO_QUEUE_LENGTH, &ctx->aio);
  171. if (err) {
  172. fprintf(stderr, "Error initializing AIO mode (%i), disabling...\n", -err);
  173. ctx->aio_mode = 0;
  174. }
  175. }
  176. #endif /* !DISABLE_AIO */
  177. ctx->preallocated = 0;
  178. return 0;
  179. }
  180. void fastwriter_default_close(fastwriter_t *fw) {
  181. if (fw->ctx) {
  182. fastwriter_default_t *ctx = (fastwriter_default_t*)fw->ctx;
  183. if (ctx->fd >= 0) {
  184. #ifndef DISABLE_AIO
  185. if ((ctx->aio_mode)&&(ctx->aio)) {
  186. int n_ev;
  187. struct io_event ev[AIO_QUEUE_LENGTH];
  188. while (ctx->ios_ready_n < AIO_QUEUE_LENGTH) {
  189. n_ev = io_getevents(ctx->aio, 1, AIO_QUEUE_LENGTH, &ev[0], NULL);
  190. if (n_ev <= 0) {
  191. fprintf(stderr, "AIO io_getevents have failed (%i)", -n_ev);
  192. break;
  193. }
  194. ctx->ios_ready_n += n_ev;
  195. }
  196. io_queue_release(ctx->aio);
  197. }
  198. #endif /* DISABLE_AIO */
  199. #ifdef HAVE_LINUX_FALLOC_H
  200. if (ctx->prior_size != (size_t)-1) {
  201. #else /* HAVE_LINUX_FALLOC_H */
  202. if ((ctx->prior_size != (size_t)-1)&&((ctx->sync_mode)||(ctx->aio_mode))) {
  203. #endif /* HAVE_LINUX_FALLOC_H */
  204. ftruncate(ctx->fd, ctx->prior_size + fw->written);
  205. }
  206. close(ctx->fd);
  207. }
  208. free(ctx);
  209. fw->ctx = NULL;
  210. }
  211. }
  212. int fastwriter_default_write(fastwriter_t *fw, fastwriter_write_flags_t flags, size_t size, void *data, size_t *written) {
  213. size_t sum = 0;
  214. size_t delta = 0;
  215. ssize_t res;
  216. fastwriter_default_t *ctx = (fastwriter_default_t*)fw->ctx;
  217. if ((flags&FASTWRITER_WRITE_FLAG_FORCE)==0) {
  218. if (size < ctx->wr_block) {
  219. *written = 0;
  220. return 0;
  221. }
  222. size -= size % ctx->wr_block;
  223. }
  224. if ((ctx->pa_block)&&((fw->written + size) > ctx->preallocated)) {
  225. #ifdef HAVE_LINUX_FALLOC_H
  226. if (fallocate(ctx->fd, FALLOC_FL_KEEP_SIZE, ctx->preallocated, ctx->pa_block)) {
  227. #else /* HAVE_LINUX_FALLOC_H */
  228. if (posix_fallocate(ctx->fd, ctx->preallocated, ctx->pa_block)) {
  229. #endif /* HAVE_LINUX_FALLOC_H */
  230. ctx->pa_block = 0;
  231. } else {
  232. ctx->preallocated += ctx->pa_block;
  233. }
  234. }
  235. // we expect this to happen only at last iteration (buffer is multiply of the required align)
  236. if (((ctx->aio_mode)||(ctx->sync_mode))&&(size%FASTWRITER_SYNCIO_ALIGN)) {
  237. delta = FASTWRITER_SYNCIO_ALIGN - size%FASTWRITER_SYNCIO_ALIGN;
  238. }
  239. #ifndef DISABLE_AIO
  240. if (ctx->aio_mode) {
  241. int err;
  242. size_t done = 0;
  243. size_t sched = 0;
  244. fastwriter_data_t *iodata;
  245. struct iocb *newio;
  246. size_t wr_block = ctx->wr_block;
  247. do {
  248. if (!ctx->ios_ready_n) {
  249. int i, n_ev;
  250. struct io_event ev[AIO_QUEUE_LENGTH];
  251. n_ev = io_getevents(ctx->aio, 1, AIO_QUEUE_LENGTH, &ev[0], NULL);
  252. if (n_ev <= 0) {
  253. fprintf(stderr, "AIO io_getevents have failed (%i)", -n_ev);
  254. return -n_ev;
  255. }
  256. for (i = 0; i < n_ev; i++) {
  257. fastwriter_data_t *ev_data = (fastwriter_data_t *)(ev[i].data);
  258. if ((ev[i].res2)||(ev[i].res < ev_data->size)) {
  259. fprintf(stderr, "AIO write failed (res: %li, res2: %li, expected: %zu), no handling data will be corrupted...\n", ev[i].res, ev[i].res2, ev_data->size);
  260. return -ev[i].res2;
  261. }
  262. ctx->ios_ready[ctx->ios_ready_n++] = ev_data->ios;
  263. // printf("Data: %i (ios %i)\n", ev_data->ready, ev_data->ios);
  264. ev_data->ready = 2;
  265. }
  266. while (ctx->data[ctx->data_tail].ready > 1) {
  267. // printf("Done: %i %zu\n", ctx->data_tail, ctx->data[ctx->data_tail].offset);
  268. ctx->data[ctx->data_tail].ready = 0;
  269. done += ctx->data[ctx->data_tail].size;
  270. if ((++ctx->data_tail) == AIO_BUFFERS) ctx->data_tail = 0;
  271. }
  272. }
  273. if ((ctx->sched + sched) < size) {
  274. if ((ctx->data_head == ctx->data_tail)&&(ctx->data[ctx->data_head].ready)) continue;
  275. newio = (struct iocb*)&ctx->ios[ctx->ios_ready[--ctx->ios_ready_n]];
  276. iodata = &ctx->data[ctx->data_head];
  277. if (wr_block > ((size + delta) - (ctx->sched + sched))) {
  278. wr_block = (size + delta) - (ctx->sched + sched);
  279. if (wr_block % ctx->page_size) {
  280. fprintf(stderr, "We need to write incomplete page (%zu bytes). This is no supported yet...\n", wr_block);
  281. return -1;
  282. }
  283. }
  284. // printf("Sched: %lu => %lu (%lu) [tail %lu, head %lu]\n", ctx->sched + sched, ctx->fd_offset, wr_block, ctx->data_tail, ctx->data_head);
  285. iodata->offset = ctx->fd_offset;
  286. iodata->size = wr_block;
  287. iodata->ios = ctx->ios_ready_n;
  288. io_prep_pwrite(newio, ctx->fd, data + ctx->sched + sched, wr_block, ctx->fd_offset);
  289. io_set_callback(newio, (void*)iodata);
  290. err = io_submit(ctx->aio, 1, &newio);
  291. if (err != 1) {
  292. fprintf(stderr, "Error submiting AIO job (%i)\n", -err);
  293. return -err;
  294. }
  295. iodata->ready = 1;
  296. sched += wr_block;
  297. ctx->fd_offset += wr_block;
  298. if ((++ctx->data_head) == AIO_BUFFERS) ctx->data_head = 0;
  299. }
  300. } while (!done);
  301. ctx->sched += sched - done;
  302. size = done;
  303. } else {
  304. #endif /* !DISABLE_AIO */
  305. do {
  306. res = write(ctx->fd, data + sum, size + delta - sum);
  307. if (res < 0) {
  308. *written = sum;
  309. return errno;
  310. }
  311. sum += res;
  312. } while (sum < size);
  313. #ifndef DISABLE_AIO
  314. }
  315. #endif /* !DISABLE_AIO */
  316. if ((ctx->sync_mode)||(ctx->aio_mode)) {
  317. posix_fadvise(ctx->fd, fw->written, size, POSIX_FADV_DONTNEED);
  318. }
  319. *written = size;
  320. return 0;
  321. }