]> git.donarmstrong.com Git - rsem.git/blob - Buffer.h
Modified WHAT_IS_NEW
[rsem.git] / Buffer.h
1 #ifndef BUFFER_H_
2 #define BUFFER_H_
3
4 #include<cstdio>
5 #include<fstream>
6 #include<pthread.h>
7
8 #include "my_assert.h"
9
10 typedef unsigned long long bufsize_type;
11 const int FLOATSIZE = sizeof(float);
12
13 class Buffer {
14 public:
15         // in_mem_arr must be allocated memory before the Buffer is constructed
16         Buffer(int nMB, int nSamples, int vlen, float* in_mem_arr, const char* tmpF) {
17                 cpos = 0;
18                 size = bufsize_type(nMB) * 1024 * 1024 / FLOATSIZE / vlen;
19                 if (size > (bufsize_type)nSamples) size = nSamples;
20                 general_assert(size > 0, "Memory allocated for credibility intervals is not enough!");
21                 size *= vlen;
22
23                 buffer = new float[size];
24                 ftmpOut.open(tmpF, std::ios::binary);
25                 pthread_mutex_init(&lock, NULL);
26
27                 fr = to = 0;
28                 this->nSamples = nSamples;
29                 this->vlen = vlen;
30                 this->in_mem_arr = in_mem_arr;
31         }
32
33         ~Buffer() {
34                 if (fr < to) flushToTempFile();
35
36                 delete[] buffer;
37                 pthread_mutex_destroy(&lock);
38                 ftmpOut.close();
39         }
40
41         void write(float value, float *vec) {
42                 pthread_assert(pthread_mutex_lock(&lock), "pthread_mutex_lock", "Error occurred while acquiring the lock!");
43                 if (size - cpos < bufsize_type(vlen)) flushToTempFile();
44                 in_mem_arr[to] = value;
45                 memcpy(buffer + cpos, vec, FLOATSIZE * vlen);
46                 cpos += vlen;
47                 ++to;
48                 pthread_assert(pthread_mutex_unlock(&lock), "pthread_mutex_unlock", "Error occurred while releasing the lock!");
49         }
50
51 private:
52         bufsize_type size, cpos; // cpos : current position
53
54         float *buffer;
55         float *in_mem_arr;
56         std::ofstream ftmpOut;
57         pthread_mutex_t lock;
58
59         int fr, to; // each flush, sample fr .. to - 1
60         int nSamples, vlen; // vlen : vector length
61
62         void flushToTempFile() {
63                 std::streampos gap1 = std::streampos(fr) * FLOATSIZE;
64                 std::streampos gap2 = std::streampos(nSamples - to) * FLOATSIZE;
65                 float *p = NULL;
66
67                 ftmpOut.seekp(0, std::ios::beg);
68                 for (int i = 0; i < vlen; i++) {
69                         p = buffer + i;
70                         ftmpOut.seekp(gap1, std::ios::cur);
71                         for (int j = fr; j < to; j++) {
72                                 ftmpOut.write((char*)p, FLOATSIZE);
73                                 p += vlen;
74                         }
75                         ftmpOut.seekp(gap2, std::ios::cur);
76                 }
77
78                 cpos = 0;
79                 fr = to;
80         }
81 };
82
83 #endif /* BUFFER_H_ */