]> git.treefish.org Git - phys/latlib.git/blob - writeout.cpp
Added writeoutdir sequencing for equal timestamps.
[phys/latlib.git] / writeout.cpp
1 #include "writeout.h"
2
3 #include <sys/types.h>
4 #include <sys/stat.h>
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <iostream>
8 #include <time.h>
9 #include <sstream>
10 #include <dirent.h>
11 #include <errno.h>
12 #include <unistd.h>
13 #include <mpi.h>
14
15 using namespace std;
16
17 string writeout::longToStr (long arg)
18 {
19   stringstream ss;
20   ss << arg;
21   return ss.str();
22 }
23
24 void writeout::newsub(string subname) {
25   of[subname] = new ofstream;
26
27   if ( fulldir != "" ) {
28     if(rank>0) of[subname]->open( (fulldir + "/rank" + cRank + "-" + subname + ".tmp").c_str() );
29     else of[subname]->open( (fulldir + "/" + signature + "-" + subname + ".dat").c_str() );
30
31     if ( !of[subname]->is_open() ) {
32       logf << "WRITEOUT: Could not open output-file!" << endl << flush;
33       exit(1);
34     }
35
36     buf[subname] = of[subname]->rdbuf();
37   }
38   else
39     buf[subname] = cout.rdbuf();
40
41   out[subname] = new ostream(buf[subname]);
42 }
43
44 writeout::writeout(const string& wdir, const string& _signature, 
45                    const int& _rank, const int& procs)
46 {
47   long timestamp;
48   int iseq=0;
49
50   fulldir = "";
51   signature = _signature;
52   rank = _rank;
53
54   if(wdir != ""){
55     numprocs = procs;
56     sprintf(cRank, "%d", rank);
57
58     if (rank == 0) {
59       timestamp = time(NULL);
60
61       while (true) {
62         fulldir = wdir + "/" + longToStr(timestamp) + "." + longToStr(iseq) + "_" + signature + ".tmp";
63         if ( mkdir(fulldir.c_str(), 0775) == 0 )
64           break;
65         else if ( errno != EEXIST ) {
66           cerr << "WRITEOUT: Could not create outdir!" << endl << flush;
67           break;
68         }
69         iseq++;
70       }
71
72 #ifndef MPI_DISABLED
73       for(int idest=1; idest<numprocs; idest++) {
74         MPI_Send(&timestamp, 1, MPI_LONG, idest, 123, MPI_COMM_WORLD);
75         MPI_Send(&iseq, 1, MPI_LONG, idest, 124, MPI_COMM_WORLD);
76       }
77 #endif
78
79     }
80
81 #ifndef MPI_DISABLED
82     else if(rank>0) {
83       MPI_Recv(&timestamp, 1, MPI_LONG, 0, 123, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
84       MPI_Recv(&iseq, 1, MPI_LONG, 0, 124, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
85       fulldir = wdir + "/" + longToStr(timestamp) + "." + longToStr(iseq) + "_" + signature + ".tmp";
86     }
87 #endif  
88     
89     logf.open( (fulldir + "/rank" + cRank + ".log").c_str() );
90
91     if ( !logf.is_open() ) {
92       cerr << "WRITEOUT: Could not open log-file!" << endl << flush;
93       exit(1);
94     }
95
96     logf << "[ " << timestring() << " ] Log starts here." << endl;
97     
98     logbuf = logf.rdbuf();
99   }
100   else{
101     logbuf = cerr.rdbuf();
102   }
103   log = new ostream(logbuf);
104 }
105
106 string writeout::timestring()
107 {
108   time_t rawtime;
109   string timestring;
110   time( &rawtime );
111   timestring = asctime( localtime( &rawtime ) );
112   return timestring.substr(0, timestring.size()-1);;
113 }
114
115 writeout::~writeout()
116 {
117   if(fulldir != "") {
118     for (map<string,ofstream*>::iterator ofit = of.begin(); ofit != of.end(); ++ofit) {
119       if( cRank[0] == '0' ) {
120         int jobsdone=0;
121         while(jobsdone<numprocs-1) {
122           string nextfile;
123           if( (nextfile = getdatfile(ofit->first)) == "" ) 
124             sleep(1);
125           else {
126             logf << "collecting " << nextfile << endl;
127                     
128             ifstream myfile( (fulldir + "/" + nextfile).c_str() );
129             while(true) {
130               string line;
131               getline(myfile, line);
132               if( !myfile.good() ) break;
133               *ofit->second << line << endl << flush;
134             }
135             myfile.close();
136             remove( (fulldir + "/" + nextfile).c_str() );
137             jobsdone++;
138           }
139         }
140         *ofit->second << "#end" << endl << flush;
141         ofit->second->close();
142       }
143       else {
144         ofit->second->close();
145         rename((fulldir + "/rank" + cRank + "-" + ofit->first + ".tmp").c_str(),
146                (fulldir + "/rank" + cRank + "-" + ofit->first + ".part").c_str());
147       }
148     }
149     if( cRank[0] == '0' )
150       rename( fulldir.c_str(), fulldir.substr(0, fulldir.length()-4).c_str() );
151   }
152   logf << "[ " << timestring() << " ] Log ends here." << endl;
153   logf.close();
154 }
155
156 string writeout::getdatfile(string subname)
157 {
158   string myfile;
159   DIR *dp;
160   struct dirent *dirp;
161
162   if((dp  = opendir(fulldir.c_str())) == NULL) {
163     logf << "Error(" << errno << ") opening " << fulldir << endl;
164     closedir(dp);
165     return "";
166   }
167   
168   while ((dirp = readdir(dp)) != NULL)
169     {
170       myfile = string(dirp->d_name);
171
172       if(myfile.length() > 3 && myfile.substr(myfile.length()-4) == "part" &&
173          subname == myfile.substr( myfile.find("-")+1, myfile.rfind(".")-myfile.find("-")-1 ) ) {
174         closedir(dp);
175         return myfile;
176       }
177     }
178
179   closedir(dp);
180   return "";
181 }