]> git.treefish.org Git - phys/latlib.git/blob - configcache.cpp
Let outdir be created from rank 0.
[phys/latlib.git] / configcache.cpp
1 #include "configcache.h"
2
3 #include <stdlib.h>
4 #include <iostream>
5 #include <time.h>
6 #include <dirent.h>
7
8 #include <boost/iostreams/filtering_streambuf.hpp>
9 #include <boost/iostreams/stream.hpp>
10 #include <boost/iostreams/filter/bzip2.hpp>
11 #include <boost/iostreams/device/array.hpp>
12 #include <boost/iostreams/copy.hpp>
13
14 #define HEADER_READOK   0
15 #define HEADER_READERR  1
16 #define HEADER_READLAST 2
17
18 struct configcache::iobuffers
19 {
20   boost::iostreams::filtering_istreambuf *in;
21   boost::iostreams::filtering_ostreambuf *out;
22 };
23
24 configcache::configcache(const string& cacheid, const int& nequi, const int& nskip, const string& datadir, char **configmem, const int& configMemSize, const int& cachemode,
25                          ostream *_log){
26   log = _log;
27
28   NEQUI = nequi;
29   NSKIP = nskip;
30   DATADIR = datadir;
31   CACHEID = cacheid;
32
33   if ( cacheid.find("_") != -1 ) {
34     if(log) *log << "CCACHE: Invalid cacheid \"" << cacheid << "\" given. Cacheids must not contain underscores!" << endl << flush;
35     exit(1);
36   }
37
38   configMem = (char*)malloc(configMemSize);
39   tmpConfig = (char*)malloc(configMemSize);
40
41   *configmem = configMem;
42   configSize = configMemSize;
43
44   ioBuffers = new iobuffers;
45   ioBuffers->in = NULL;
46   ioBuffers->out = NULL;
47
48   MODE = cachemode;
49
50   refetchDataFiles = false;
51 }
52
53 string configcache::getFileId(int actnequi, const bool& shortid)
54 {
55   stringstream fileid;
56
57   if(!shortid) fileid << CACHEID << "_" << actnequi << "_" << NSKIP;
58   for(int ipara=0; ipara<Paras.size(); ipara++)
59     fileid << "_" << Paras[ipara].id << Paras[ipara].val;
60
61   return fileid.str();
62 }
63
64 void configcache::fetchDataFiles()
65 {
66   struct dirent *de=NULL;
67   DIR *d=NULL;
68   static infiledesc filedesc;
69   
70   d=opendir(DATADIR.c_str());
71   if(d != NULL){
72     while(de = readdir(d)){
73       string filename = de->d_name;
74       if(isValidInFile(filename, &filedesc)) 
75         {
76           inFiles.push_back(filedesc);
77         }
78     }
79   }
80 }
81
82 bool configcache::isValidInFile(const string& infile, infiledesc *filedesc)
83 {
84   char *inchar, *inParts;
85   string truncIn, truncOut;
86
87   filedesc->filename = infile;
88
89   if( infile.size() < 4 ) return false;
90
91   if( infile.substr(infile.size()-4) == ".dat" )
92     filedesc->extended = false;
93   else if( infile.substr(infile.size()-4) == "edat" )
94     filedesc->extended = true;
95   else
96     return false;
97
98   inchar = new char [infile.size()+1];
99   strcpy (inchar, infile.c_str());
100
101   inParts = strtok( inchar, "_" );
102   for(int iPart=0; inParts!=NULL; iPart++)
103     {
104       if( iPart>3 ) { truncIn += "_"; truncIn += inParts; }
105
106       switch(iPart)
107         {
108         case 1: if(inParts != CACHEID)
109             return false;
110           break;
111         case 2:
112           filedesc->nequi = atoi(inParts);
113           break;
114         case 3: 
115           if(atoi(inParts) != NSKIP) 
116             return false;
117           filedesc->nskip = atoi(inParts);
118           break;
119         }
120       inParts = strtok( NULL, "_");
121     }
122   truncIn = truncIn.substr(0, truncIn.size()-4);
123
124   delete[] inchar;
125
126   if( truncIn.find( getFileId(NEQUI, true) + "_" ) == string::npos ) return false;
127
128   return true;
129 }
130
131 int configcache::readHeader()
132 {
133   long unsigned int headersize;
134   
135   if( readDataToMem((char *)&headersize, sizeof(long unsigned int)) == sizeof(long unsigned int) && inFile.is_open() ) {
136     if ( headersize == 0 )
137       return HEADER_READLAST;
138
139     pair<unsigned long, void *> newHeader;
140
141     if( readDataToMem((char *)&newHeader.first, sizeof(unsigned long)) == sizeof(unsigned long) && inFile.is_open() ) {
142       newHeader.second = malloc(headersize);
143
144       if( readDataToMem((char *)newHeader.second, headersize) == headersize && inFile.is_open() ) {
145         headerStore.push_back(newHeader);
146         return HEADER_READOK;
147       }
148       else {
149         if(log) *log << "CCACHE: Could not read heade-data! Closing dat-file: " << openFileDesc.filename << endl << flush;
150         inFile.close();
151         return HEADER_READERR;
152       }
153     }
154     else {
155       if(log) *log << "CCACHE: Could not read headerid-hash! Closing dat-file: " << openFileDesc.filename << endl << flush;
156       inFile.close();
157       return HEADER_READERR;
158     }
159   }
160   else {
161     if(log) *log << "CCACHE: Could not read header size. Closing dat-file: " << openFileDesc.filename << endl << flush;
162     inFile.close();
163     return HEADER_READERR;
164   }
165 }
166
167 bool configcache::readAllHeaders()
168 {
169   int readHeaderStatus;
170
171   deleteHeaderStore();
172   
173   do {
174     readHeaderStatus = readHeader();
175   }
176   while ( readHeaderStatus == HEADER_READOK );
177
178   if ( readHeaderStatus == HEADER_READLAST ) return true;
179   else if ( readHeaderStatus == HEADER_READERR ) return false;
180 }
181
182 void * configcache::getHeader(const string& headerid) {
183   for (vector< pair<unsigned long, void *> >::iterator headerStoreIt = headerStore.begin(); headerStoreIt != headerStore.end(); ++headerStoreIt)
184     if ( headerStoreIt->first == hash(headerid) )
185       return headerStoreIt->second;
186   
187   return NULL;
188 }
189
190 void configcache::readConfig(bool *readnewconfig, int *nequileft, vector<unsigned long> *excludeFileHashes)
191 {
192   *readnewconfig = false;
193
194   if(DATADIR == "" || MODE == CACHE_MODE_DISABLED) return;
195
196   if(refetchDataFiles){
197     refetchDataFiles = false;
198     fetchDataFiles();
199   }
200
201   while(true)
202     {
203       vector<infiledesc>::iterator inFileIt = getNextInfile(excludeFileHashes);
204       int iDidVirtualSkips;
205
206       if( (!inFile.is_open()) && inFileIt == inFiles.end() ) {
207         if (*readnewconfig)
208           *nequileft = nequileft_internal;
209         return;
210       }
211
212       while( (!inFile.is_open()) && inFiles.size() > 0 ) {
213         openFileDesc = *inFileIt;
214
215         if (openFileDesc.nequi < NEQUI)
216           doVirtualEquilibration = true;
217         else
218           doVirtualEquilibration = false;
219
220         firstUsedConfig = true;
221
222         if(log) *log << "CCACHE: Opening dat-file: " << inFileIt->filename << endl << flush;
223         inFile.open( (DATADIR + "/" + inFileIt->filename).c_str(), std::ios::binary );
224         
225         inFiles.erase(inFileIt);
226         
227         if( !inFile.is_open() ) continue;
228
229         ioBuffers->in = new boost::iostreams::filtering_istreambuf;
230         ioBuffers->in->push( boost::iostreams::bzip2_decompressor() );
231         ioBuffers->in->push(inFile);
232       }
233
234       if( inFile.is_open() ) 
235         {
236           if (doVirtualEquilibration) {
237             if(log) *log << "CCACHE: Trying virtual equilibration." << endl << flush;
238             doVirtualEquilibration = false;
239             for (iDidVirtualSkips=0; iDidVirtualSkips < (NEQUI-openFileDesc.nequi)/openFileDesc.nskip; iDidVirtualSkips++) {
240               if( readFullBlock(tmpConfig, configSize) != configSize || ! inFile.is_open() )
241                 break;
242               else if ( (NEQUI-openFileDesc.nequi) - (iDidVirtualSkips+1)*openFileDesc.nskip < nequileft_internal ) {
243                 memcpy(configMem, tmpConfig, configSize);
244                 nequileft_internal = NEQUI - openFileDesc.nequi - (iDidVirtualSkips+1)*openFileDesc.nskip;
245                 *readnewconfig = true;
246                 firstUsedConfig = false;
247               }
248             }
249           }
250
251           if( readFullBlock(tmpConfig, configSize) == configSize && inFile.is_open() )
252             {
253               memcpy(configMem, tmpConfig, configSize);
254               *readnewconfig = true;
255               if (firstUsedConfig) {
256                 firstUsedConfig = false;
257                 if (openFileDesc.nequi < NEQUI)
258                   nequileft_internal = NEQUI - openFileDesc.nequi - iDidVirtualSkips*openFileDesc.nskip;
259                 else
260                   nequileft_internal = NEQUI - openFileDesc.nequi;
261               }
262               nequileft_internal -= openFileDesc.nskip;
263               *nequileft = nequileft_internal;
264               return;
265             }
266           else {
267             if(log) *log << "CCACHE: Could not read configuration. Closing dat-file: " << openFileDesc.filename << endl << flush;
268             inFile.close();
269           }
270         }
271     }
272 }
273
274 void configcache::openOutFile(int actnequi)
275
276   time_t secstamp = time(NULL);
277
278   outFileName.str("");
279   outFileName << DATADIR << "/" << secstamp << "_" << getFileId(actnequi) << "_.edat.tmp";
280
281   outFile.open( outFileName.str().c_str(), std::ios::binary );
282
283   ioBuffers->out = new boost::iostreams::filtering_ostreambuf;
284   ioBuffers->out->push(boost::iostreams::bzip2_compressor());
285   ioBuffers->out->push(outFile);
286 }
287
288 void configcache::writeHeader(const string& headerid, const char *header, long unsigned int size, int actnequi) {
289   unsigned long headeridhash;
290
291   if( DATADIR == "" || MODE < 2 ) return;
292
293   if(!outFile.is_open())
294     openOutFile(actnequi);
295
296   headeridhash = hash(headerid);
297
298   boost::iostreams::write(*ioBuffers->out, (char*)&size, sizeof(long unsigned int));
299   boost::iostreams::write(*ioBuffers->out, (char*)&headeridhash, sizeof(unsigned long));
300   boost::iostreams::write(*ioBuffers->out, header, size);
301 }
302
303 void configcache::writeConfig(int actnequi)
304 {
305   long unsigned int zeroheader=0;
306
307   if ( DATADIR == "" || MODE < 2 ) return;
308
309   if ( ! outFile.is_open() )
310     openOutFile(actnequi);
311   
312   boost::iostreams::write(*ioBuffers->out, (char*)&zeroheader, sizeof(long unsigned int));
313
314   boost::iostreams::write(*ioBuffers->out, configMem, configSize);
315 }
316
317 void configcache::addPara(const string& parid, const double& val){
318   parameter newPara;
319   newPara.id = parid;
320   newPara.val = val;
321   Paras.push_back(newPara);
322 }
323
324 int configcache::getParIndex(const string& parid){
325   for(int ipara=0; ipara<Paras.size(); ipara++)
326     if(Paras[ipara].id == parid) return ipara;
327 }
328
329 void configcache::setPara(const string& parid, const double& value){
330   Paras[getParIndex(parid)].val = value;
331
332   finishOutFile();
333   if(ioBuffers->in != NULL) { delete ioBuffers->in; ioBuffers->in=NULL; } 
334   inFile.close();
335   inFiles.clear();
336
337   refetchDataFiles = true;
338   nequileft_internal = NEQUI;
339 }
340
341 configcache::~configcache()
342 {
343   finishOutFile();
344   delete ioBuffers->in;
345   ioBuffers->in = NULL;
346 }
347
348 void configcache::finishOutFile()
349 {
350   if( ioBuffers->out != NULL )
351     {
352       delete ioBuffers->out;
353       ioBuffers->out = NULL;
354     }
355
356   if( outFile.is_open() )
357     {
358       outFile.close();
359       rename( outFileName.str().c_str(), outFileName.str().substr(0, outFileName.str().size()-4).c_str() );
360     }
361 }
362
363 int configcache::readFullBlock(char *tmpData, long unsigned int dataSize)
364 {
365   /* try to read header */
366   if ( openFileDesc.extended )
367     if ( ! readAllHeaders() ) 
368       return -1;
369
370   /* read data */
371   return readDataToMem(tmpData, dataSize);
372 }
373
374 int configcache::readDataToMem(char *tmpData, long unsigned int dataSize)
375 {
376   int readturn = -1;
377
378   if ( dataSize == 0 ) return 0;
379
380   try { readturn = boost::iostreams::read(*ioBuffers->in, tmpData, dataSize); }
381   catch(boost::iostreams::bzip2_error& error) { 
382     if(log) *log << "CCACHE: Caught bzip2 exception with error code: " << error.error() << endl << flush;
383     inFile.close();
384   } 
385   catch (std::exception const& ex) {
386     if(log) *log << "CCACHE: Caught exception: " << ex.what() << endl << flush;
387     inFile.close();
388   }
389   catch( ... ) {
390     if(log) *log << "CCACHE: Caught unknown exception while reading." << endl << flush;
391     inFile.close();
392   }
393
394   return readturn;
395 }
396
397 unsigned long configcache::hash(const string& str)
398 {
399   unsigned long hash = 5381;
400
401   for(string::const_iterator it=str.begin();it!=str.end();it++) 
402     hash = ((hash << 5) + hash) + *it; /* hash * 33 + character */
403
404   return hash;
405 }
406
407 void configcache::deleteHeaderStore()
408 {
409   while ( headerStore.size() > 0 ) {
410     free(headerStore.back().second);
411     headerStore.pop_back();
412   }
413 }
414
415 vector<infiledesc>::iterator configcache::getNextInfile(vector<unsigned long> *excludeFileHashes) {
416   for (vector<infiledesc>::iterator init = inFiles.begin(); init != inFiles.end(); ++init) {
417     if (excludeFileHashes != NULL) {
418       bool excludethisfile = false;
419
420       for (vector<unsigned long>::iterator exit = excludeFileHashes->begin(); exit != excludeFileHashes->end(); ++exit)
421         if ( *exit == hash(init->filename) ) {
422           excludethisfile = true;
423           break;
424         }
425
426       if (excludethisfile)
427         continue;
428     }
429     return init;
430   }
431   return inFiles.end();
432 }