Home > matlab > parallel > masterParallel.m

masterParallel

PURPOSE ^

PARALLEL CONTEXT

SYNOPSIS ^

function [fOutVar,nBlockPerCPU, totCPU] = masterParallel(Parallel,fBlock,nBlock,NamFileInput,fname,fInputVar,fGlobalVar,Parallel_info,initialize)

DESCRIPTION ^

 PARALLEL CONTEXT
 This is the most important function for the management of DYNARE parallel
 computing.
 It is the top-level function called on the master computer when parallelizing a task.

CROSS-REFERENCE INFORMATION ^

This function calls: This function is called by:

SOURCE CODE ^

0001 function [fOutVar,nBlockPerCPU, totCPU] = masterParallel(Parallel,fBlock,nBlock,NamFileInput,fname,fInputVar,fGlobalVar,Parallel_info,initialize)
0002 % PARALLEL CONTEXT
0003 % This is the most important function for the management of DYNARE parallel
0004 % computing.
0005 % It is the top-level function called on the master computer when parallelizing a task.
0006 
0007 % This function have two main computational startegy for manage the matlab worker (slave process).
0008 % 0 Simple Close/Open Stategy:
0009 % In this case the new matlab istances (slave process) are open when
0010 % necessary and then closed. This can happen many times during the
0011 % simulation of a model.
0012 
0013 % 1 Alway Open Stategy:
0014 % In this case we have a more sophisticated management of slave processes,
0015 % which are no longer closed at the end of each job. The slave processes
0016 % waits for a new job (if exist). If a slave do not receives a new job after a
0017 % fixed time it is destroyed. This solution removes the computational
0018 % time necessary to Open/Close new matlab istances.
0019 
0020 % The first (point 0) is the default Strategy
0021 % i.e.(Parallel_info.leaveSlaveOpen=0). This value can be changed by the
0022 % user in xxx.mod file or it is changed by the programmer if it necessary to
0023 % reduce the overall computational time. See for example the
0024 % prior_posterior_statistics.m.
0025 
0026 % The number of parallelized threads will be equal to (nBlock-fBlock+1).
0027 %
0028 % INPUTS
0029 %  o Parallel [struct vector]   copy of options_.parallel
0030 %  o fBlock [int]               index number of the first thread
0031 %                               (between 1 and nBlock)
0032 %  o nBlock [int]               index number of the last thread
0033 %  o NamFileInput [cell array]  containins the list of input files to be
0034 %                               copied in the working directory of remote slaves
0035 %                               2 columns, as many lines as there are files
0036 %                               - first column contains directory paths
0037 %                               - second column contains filenames
0038 %  o fname [string]             name of the function to be parallelized, and
0039 %                               which will be run on the slaves
0040 %  o fInputVar [struct]         structure containing local variables to be used
0041 %                               by fName on the slaves
0042 %  o fGlobalVar [struct]        structure containing global variables to be used
0043 %                               by fName on the slaves
0044 %  o Parallel_info              []
0045 %  o initialize                 []
0046 %
0047 % OUTPUT
0048 %  o fOutVar [struct vector]   result of the parallel computation, one
0049 %                              struct per thread
0050 %  o nBlockPerCPU [int vector] for each CPU used, indicates the number of
0051 %                              threads run on that CPU
0052 %  o totCPU [int]              total number of CPU used (can be lower than
0053 %                              the number of CPU declared in "Parallel", if
0054 %                              the number of required threads is lower)
0055 
0056 % Copyright (C) 2009-2011 Dynare Team
0057 %
0058 % This file is part of Dynare.
0059 %
0060 % Dynare is free software: you can redistribute it and/or modify
0061 % it under the terms of the GNU General Public License as published by
0062 % the Free Software Foundation, either version 3 of the License, or
0063 % (at your option) any later version.
0064 %
0065 % Dynare is distributed in the hope that it will be useful,
0066 % but WITHOUT ANY WARRANTY; without even the implied warranty of
0067 % MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
0068 % GNU General Public License for more details.
0069 %
0070 % You should have received a copy of the GNU General Public License
0071 % along with Dynare.  If not, see <http://www.gnu.org/licenses/>.
0072 
0073 
0074 % If islocal==0, create a new directory for remote computation.
0075 % This directory is named using current data and time,
0076 % is used only one time and then deleted.
0077 
0078 persistent PRCDir
0079 % PRCDir = Present Remote Computational Directory!
0080 
0081 Strategy=Parallel_info.leaveSlaveOpen;
0082 
0083 islocal = 1;
0084 for j=1:length(Parallel),
0085     islocal=islocal*Parallel(j).Local;
0086 end
0087 if nargin>8 && initialize==1
0088     if islocal == 0,
0089         PRCDir=CreateTimeString();
0090         assignin('base','PRCDirTmp',PRCDir),
0091         evalin('base','options_.parallel_info.RemoteTmpFolder=PRCDirTmp;')
0092         evalin('base','clear PRCDirTmp,')
0093     else
0094         % Delete the traces (if existing) of last local session of computations.
0095         if Strategy==1,
0096             mydelete(['slaveParallel_input*.mat']);
0097         end
0098     end
0099     return
0100 end
0101 
0102 
0103 % Deactivate some 'Parallel/Warning' message in Octave!
0104 % Comment the line 'warning('off');' in order to view the warning message
0105 % in Octave!
0106 
0107 if exist('OCTAVE_VERSION'),
0108     warning('off');
0109 end
0110 
0111 
0112 if Strategy==1
0113     totCPU=0;
0114 end
0115 
0116 
0117 % Determine my hostname and my working directory.
0118 
0119 DyMo=pwd;
0120 % fInputVar.DyMo=DyMo;
0121 if ispc, % ~(isunix || (~matlab_ver_less_than('7.4') && ismac)) ,
0122     [tempo, MasterName]=system('hostname');
0123     MasterName=deblank(MasterName);
0124 end
0125 % fInputVar.MasterName = MasterName;
0126 
0127 
0128 % Save input data for use by the slaves.
0129 switch Strategy
0130     case 0
0131         if exist('fGlobalVar'),
0132             save([fname,'_input.mat'],'fInputVar','fGlobalVar')
0133         else
0134             save([fname,'_input.mat'],'fInputVar')
0135         end
0136         save([fname,'_input.mat'],'Parallel','-append')
0137         
0138     case 1
0139         if exist('fGlobalVar'),
0140             save(['temp_input.mat'],'fInputVar','fGlobalVar')
0141         else
0142             save(['temp_input.mat'],'fInputVar')
0143         end
0144         save(['temp_input.mat'],'Parallel','-append')
0145         closeSlave(Parallel,PRCDir,-1);
0146 end
0147 
0148 
0149 % Determine the total number of available CPUs, and the number of threads
0150 % to run on each CPU.
0151 
0152 [nCPU, totCPU, nBlockPerCPU, totSlaves] = distributeJobs(Parallel, fBlock, nBlock);
0153 for j=1:totSlaves,
0154     PRCDirSnapshot{j}={};
0155 end
0156 offset0 = fBlock-1;
0157 
0158 % Clean up remnants of previous runs.
0159 mydelete(['comp_status_',fname,'*.mat']);
0160 mydelete(['P_',fname,'*End.txt']);
0161 mydelete([fname,'_output_*.mat']);
0162 
0163 
0164 % Create a shell script containing the commands to launch the required
0165 % tasks on the slaves.
0166 fid = fopen('ConcurrentCommand1.bat','w+');
0167 
0168 
0169 % Create the directory devoted to remote computation.
0170 if isempty(PRCDir) && ~islocal,
0171     error('PRCDir not initialized!')
0172 else
0173     dynareParallelMkDir(PRCDir,Parallel(1:totSlaves));
0174 end
0175 
0176 % Testing Zone
0177 
0178 % 1. Display the User Strategy:
0179 
0180 % if Strategy==0
0181 %     disp('User Strategy Now Is Open/Close (0)');
0182 % else
0183 %     disp('User Strategy Now Is Always Open (1)');
0184 % end
0185 
0186 
0187 % 2. Display the output of 'NEW' distributeJobs.m:
0188 %
0189 % fBlock
0190 % nBlock
0191 %
0192 %
0193 % nCPU
0194 % totCPU
0195 % nBlockPerCPU
0196 % totSlaves
0197 %
0198 % keyboard
0199 
0200 % End
0201 
0202 for j=1:totCPU,
0203     
0204     if Strategy==1
0205         command1 = ' ';
0206     end
0207     
0208     indPC=min(find(nCPU>=j));
0209     
0210     % According to the information contained in configuration file, compThread can limit MATLAB
0211     % to a single computational thread. By default, MATLAB makes use of the multithreading
0212     % capabilities of the computer on which it is running. Nevertheless
0213     % exsperimental results show as matlab native
0214     % multithreading limit the performaces when the parallel computing is active.
0215     
0216     
0217     if strcmp('true',Parallel(indPC).SingleCompThread),
0218         compThread = '-singleCompThread';
0219     else
0220         compThread = '';
0221     end
0222     
0223     if indPC>1
0224         nCPU0 = nCPU(indPC-1);
0225     else
0226         nCPU0=0;
0227     end
0228     offset = sum(nBlockPerCPU(1:j-1))+offset0;
0229     
0230     % Create a file used to monitoring if a parallel block (core)
0231     % computation is finished or not.
0232     
0233     fid1=fopen(['P_',fname,'_',int2str(j),'End.txt'],'w+');
0234     fclose(fid1);
0235     
0236     if Strategy==1,
0237         
0238         fblck = offset+1;
0239         nblck = sum(nBlockPerCPU(1:j));
0240         save temp_input.mat fblck nblck fname -append;
0241         copyfile('temp_input.mat',['slaveJob',int2str(j),'.mat']);
0242         if Parallel(indPC).Local ==0,
0243             fid1=fopen(['stayalive',int2str(j),'.txt'],'w+');
0244             fclose(fid1);
0245             dynareParallelSendFiles(['stayalive',int2str(j),'.txt'],PRCDir,Parallel(indPC));
0246             mydelete(['stayalive',int2str(j),'.txt']);
0247         end
0248         % Wait for possibly local alive CPU to start the new job or close by
0249         % internal criteria.
0250         pause(1);
0251         newInstance = 0;
0252         
0253         % Check if j CPU is already alive.
0254         if isempty(dynareParallelDir(['P_slave_',int2str(j),'End.txt'],PRCDir,Parallel(indPC)));
0255             fid1=fopen(['P_slave_',int2str(j),'End.txt'],'w+');
0256             fclose(fid1);
0257             if Parallel(indPC).Local==0,
0258                 dynareParallelSendFiles(['P_slave_',int2str(j),'End.txt'],PRCDir,Parallel(indPC));
0259                 delete(['P_slave_',int2str(j),'End.txt']);
0260             end
0261             
0262             newInstance = 1;
0263             storeGlobalVars( ['slaveParallel_input',int2str(j),'.mat']);
0264             save( ['slaveParallel_input',int2str(j),'.mat'],'Parallel','-append');
0265             % Prepare global vars for Slave.
0266         end
0267     else
0268         
0269         % If the computation is executed remotely all the necessary files
0270         % are created localy, then copied in remote directory and then
0271         % deleted (loacal)!
0272         
0273         save( ['slaveParallel_input',int2str(j),'.mat'],'Parallel');
0274         
0275         if Parallel(indPC).Local==0,
0276             dynareParallelSendFiles(['P_',fname,'_',int2str(j),'End.txt'],PRCDir,Parallel(indPC));
0277             delete(['P_',fname,'_',int2str(j),'End.txt']);
0278             
0279             dynareParallelSendFiles(['slaveParallel_input',int2str(j),'.mat'],PRCDir,Parallel(indPC));
0280             delete(['slaveParallel_input',int2str(j),'.mat']);
0281             
0282         end
0283         
0284     end
0285     
0286     % DA SINTETIZZARE:
0287     
0288     %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
0289     % The following 'switch - case' code is the core of this function!
0290     switch Strategy
0291         case 0
0292             
0293             if Parallel(indPC).Local == 1,                                  % 0.1 Run on the local machine (localhost).
0294                 
0295                 if ~ispc || strcmpi('unix',Parallel(indPC).OperatingSystem), % Hybrid computing Windows <-> Unix!
0296                     if strfind([Parallel(indPC).MatlabOctavePath], 'octave') % Hybrid computing Matlab(Master)->Octave(Slaves) and Vice Versa!
0297                         command1=['octave --eval "default_save_options(''-v7''); addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')" &'];
0298                     else
0299                         command1=[Parallel(indPC).MatlabOctavePath,' -nosplash -nodesktop -minimize ',compThread,' -r "addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')" &'];
0300                     end
0301                 else    % Hybrid computing Matlab(Master)->Octave(Slaves) and Vice Versa!
0302                     if  strfind([Parallel(indPC).MatlabOctavePath], 'octave')
0303                         command1=['start /B psexec -W ',DyMo, ' -a ',int2str(Parallel(indPC).CPUnbr(j-nCPU0)),' -low  octave --eval "default_save_options(''-v7''); addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')"'];
0304                     else
0305                         command1=['start /B psexec -W ',DyMo, ' -a ',int2str(Parallel(indPC).CPUnbr(j-nCPU0)),' -low  ',Parallel(indPC).MatlabOctavePath,' -nosplash -nodesktop -minimize ',compThread,' -r "addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')"'];
0306                     end
0307                 end
0308             else                                                            % 0.2 Parallel(indPC).Local==0: Run using network on remote machine or also on local machine.
0309                 if j==nCPU0+1,
0310                     dynareParallelSendFiles([fname,'_input.mat'],PRCDir,Parallel(indPC));
0311                     dynareParallelSendFiles(NamFileInput,PRCDir,Parallel(indPC));
0312                 end
0313                 
0314                 if (~ispc || strcmpi('unix',Parallel(indPC).OperatingSystem)), % Hybrid computing Windows <-> Unix!
0315                     if ispc, token='start /B ';
0316                     else token = '';
0317                     end
0318                     % To manage the diferences in Unix/Windows OS syntax.
0319                     remoteFile=['remoteDynare',int2str(j)];
0320                     fidRemote=fopen([remoteFile,'.m'],'w+');
0321                     if strfind([Parallel(indPC).MatlabOctavePath], 'octave'),% Hybrid computing Matlab(Master)->Octave(Slaves) and Vice Versa!
0322                         remoteString=['default_save_options(''-v7''); addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')'];
0323                         command1=[token, 'ssh ',Parallel(indPC).UserName,'@',Parallel(indPC).ComputerName,' "cd ',Parallel(indPC).RemoteDirectory,'/',PRCDir, '; octave --eval ',remoteFile,' " &'];
0324                     else
0325                         remoteString=['addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')'];
0326                         command1=[token, 'ssh ',Parallel(indPC).UserName,'@',Parallel(indPC).ComputerName,' "cd ',Parallel(indPC).RemoteDirectory,'/',PRCDir, '; ',Parallel(indPC).MatlabOctavePath,' -nosplash -nodesktop -minimize ',compThread,' -r ',remoteFile,';" &'];
0327                     end
0328                     fprintf(fidRemote,'%s\n',remoteString);
0329                     fclose(fidRemote);
0330                     dynareParallelSendFiles([remoteFile,'.m'],PRCDir,Parallel(indPC));
0331                     delete([remoteFile,'.m']);
0332                 else
0333                     if ~strcmp(Parallel(indPC).ComputerName,MasterName),  % 0.3 Run on a remote machine!
0334                         % Hybrid computing Matlab(Master)-> Octave(Slaves) and Vice Versa!
0335                         if  strfind([Parallel(indPC).MatlabOctavePath], 'octave')
0336                             command1=['start /B psexec \\',Parallel(indPC).ComputerName,' -e -u ',Parallel(indPC).UserName,' -p ',Parallel(indPC).Password,' -W ',Parallel(indPC).RemoteDrive,':\',Parallel(indPC).RemoteDirectory,'\',PRCDir,'\ -a ',int2str(Parallel(indPC).CPUnbr(j-nCPU0)), ...
0337                                 ' -low  octave --eval "default_save_options(''-v7''); addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')"'];
0338                         else
0339                             command1=['start /B psexec \\',Parallel(indPC).ComputerName,' -e -u ',Parallel(indPC).UserName,' -p ',Parallel(indPC).Password,' -W ',Parallel(indPC).RemoteDrive,':\',Parallel(indPC).RemoteDirectory,'\',PRCDir,'\ -a ',int2str(Parallel(indPC).CPUnbr(j-nCPU0)), ...
0340                                 ' -low  ',Parallel(indPC).MatlabOctavePath,' -nosplash -nodesktop -minimize ',compThread,' -r "addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')"'];
0341                         end
0342                     else                                                  % 0.4 Run on the local machine via the network
0343                         % Hybrid computing Matlab(Master)->Octave(Slaves) and Vice Versa!
0344                         if  strfind([Parallel(indPC).MatlabOctavePath], 'octave')
0345                             command1=['start /B psexec \\',Parallel(indPC).ComputerName,' -e -W ',Parallel(indPC).RemoteDrive,':\',Parallel(indPC).RemoteDirectory,'\',PRCDir,'\ -a ',int2str(Parallel(indPC).CPUnbr(j-nCPU0)), ...
0346                                 ' -low  octave --eval "addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')"'];
0347                         else
0348                             command1=['start /B psexec \\',Parallel(indPC).ComputerName,' -e -W ',Parallel(indPC).RemoteDrive,':\',Parallel(indPC).RemoteDirectory,'\',PRCDir,'\ -a ',int2str(Parallel(indPC).CPUnbr(j-nCPU0)), ...
0349                                 ' -low  ',Parallel(indPC).MatlabOctavePath,' -nosplash -nodesktop -minimize ',compThread,' -r "addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')"'];
0350                         end
0351                     end
0352                 end
0353             end
0354             
0355             
0356         case 1
0357             if Parallel(indPC).Local == 1 && newInstance,                       % 1.1 Run on the local machine.
0358                 if (~ispc || strcmpi('unix',Parallel(indPC).OperatingSystem)),  % Hybrid computing Windows <-> Unix!
0359                     if strfind([Parallel(indPC).MatlabOctavePath], 'octave')    % Hybrid computing Matlab(Master)-> Octave(Slaves) and Vice Versa!
0360                         command1=['octave --eval "default_save_options(''-v7''); addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); slaveParallel(',int2str(j),',',int2str(indPC),')" &'];
0361                     else
0362                         command1=[Parallel(indPC).MatlabOctavePath,' -nosplash -nodesktop -minimize ',compThread,' -r "addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); slaveParallel(',int2str(j),',',int2str(indPC),')" &'];
0363                     end
0364                 else    % Hybrid computing Matlab(Master)->Octave(Slaves) and Vice Versa!
0365                     if  strfind([Parallel(indPC).MatlabOctavePath], 'octave')
0366                         command1=['start /B psexec -W ',DyMo, ' -a ',int2str(Parallel(indPC).CPUnbr(j-nCPU0)),' -low  octave --eval "default_save_options(''-v7'');addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); slaveParallel(',int2str(j),',',int2str(indPC),')"'];
0367                     else
0368                         command1=['start /B psexec -W ',DyMo, ' -a ',int2str(Parallel(indPC).CPUnbr(j-nCPU0)),' -low  ',Parallel(indPC).MatlabOctavePath,' -nosplash -nodesktop -minimize ',compThread,' -r "addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); slaveParallel(',int2str(j),',',int2str(indPC),')"'];
0369                     end
0370                 end
0371             elseif Parallel(indPC).Local==0,                                % 1.2 Run using network on remote machine or also on local machine.
0372                 if j==nCPU0+1,
0373                     dynareParallelSendFiles(NamFileInput,PRCDir,Parallel(indPC));
0374                 end
0375                 dynareParallelSendFiles(['P_',fname,'_',int2str(j),'End.txt'],PRCDir,Parallel(indPC));
0376                 delete(['P_',fname,'_',int2str(j),'End.txt']);
0377                 if newInstance,
0378                     dynareParallelSendFiles(['slaveJob',int2str(j),'.mat'],PRCDir,Parallel(indPC));
0379                     delete(['slaveJob',int2str(j),'.mat']);
0380                     dynareParallelSendFiles(['slaveParallel_input',int2str(j),'.mat'],PRCDir,Parallel(indPC))
0381                     if (~ispc || strcmpi('unix',Parallel(indPC).OperatingSystem)), % Hybrid computing Windows <-> Unix!
0382                         if ispc, token='start /B ';
0383                         else token = '';
0384                         end
0385                         % To manage the diferences in Unix/Windows OS syntax.
0386                         remoteFile=['remoteDynare',int2str(j)];
0387                         fidRemote=fopen([remoteFile,'.m'],'w+');
0388                         if strfind([Parallel(indPC).MatlabOctavePath], 'octave') % Hybrid computing Matlab(Master)-> Octave(Slaves) and Vice Versa!
0389                             remoteString=['default_save_options(''-v7''); addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); slaveParallel(',int2str(j),',',int2str(indPC),');'];
0390                             command1=[token, 'ssh ',Parallel(indPC).UserName,'@',Parallel(indPC).ComputerName,' "cd ',Parallel(indPC).RemoteDirectory,'/',PRCDir '; octave --eval ',remoteFile,' " &'];
0391                         else
0392                             remoteString=['addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); slaveParallel(',int2str(j),',',int2str(indPC),');'];
0393                             command1=[token, 'ssh ',Parallel(indPC).UserName,'@',Parallel(indPC).ComputerName,' "cd ',Parallel(indPC).RemoteDirectory,'/',PRCDir '; ',Parallel(indPC).MatlabOctavePath,' -nosplash -nodesktop -minimize ',compThread,' -r ',remoteFile,';" &'];
0394                         end
0395                         fprintf(fidRemote,'%s\n',remoteString);
0396                         fclose(fidRemote);
0397                         dynareParallelSendFiles([remoteFile,'.m'],PRCDir,Parallel(indPC));
0398                         delete([remoteFile,'.m']);
0399                     else
0400                         if ~strcmp(Parallel(indPC).ComputerName,MasterName), % 1.3 Run on a remote machine.
0401                             % Hybrid computing Matlab(Master)->Octave(Slaves) and Vice Versa!
0402                             if  strfind([Parallel(indPC).MatlabOctavePath], 'octave')
0403                                 command1=['start /B psexec \\',Parallel(indPC).ComputerName,' -e -u ',Parallel(indPC).UserName,' -p ',Parallel(indPC).Password,' -W ',Parallel(indPC).RemoteDrive,':\',Parallel(indPC).RemoteDirectory,'\',PRCDir,'\ -a ',int2str(Parallel(indPC).CPUnbr(j-nCPU0)), ...
0404                                     ' -low  octave --eval "default_save_options(''-v7'');addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); slaveParallel(',int2str(j),',',int2str(indPC),')"'];
0405                             else
0406                                 command1=['start /B psexec \\',Parallel(indPC).ComputerName,' -e -u ',Parallel(indPC).UserName,' -p ',Parallel(indPC).Password,' -W ',Parallel(indPC).RemoteDrive,':\',Parallel(indPC).RemoteDirectory,'\',PRCDir,'\ -a ',int2str(Parallel(indPC).CPUnbr(j-nCPU0)), ...
0407                                     ' -low  ',Parallel(indPC).MatlabOctavePath,' -nosplash -nodesktop -minimize ',compThread,' -r "addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); slaveParallel(',int2str(j),',',int2str(indPC),')"'];
0408                             end
0409                         else                                                % 1.4 Run on the local machine via the network.
0410                             % Hybrid computing Matlab(Master)->Octave(Slaves) and Vice Versa!
0411                             if  strfind([Parallel(indPC).MatlabOctavePath], 'octave')
0412                                 command1=['start /B psexec \\',Parallel(indPC).ComputerName,' -e -W ',Parallel(indPC).RemoteDrive,':\',Parallel(indPC).RemoteDirectory,'\',PRCDir,'\ -a ',int2str(Parallel(indPC).CPUnbr(j-nCPU0)), ...
0413                                     ' -low  octave --eval "default_save_options(''-v7''); addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); slaveParallel(',int2str(j),',',int2str(indPC),')"'];
0414                             else
0415                                 command1=['start /B psexec \\',Parallel(indPC).ComputerName,' -e -W ',Parallel(indPC).RemoteDrive,':\',Parallel(indPC).RemoteDirectory,'\',PRCDir,'\ -a ',int2str(Parallel(indPC).CPUnbr(j-nCPU0)), ...
0416                                     ' -low  ',Parallel(indPC).MatlabOctavePath,' -nosplash -nodesktop -minimize ',compThread,' -r "addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); slaveParallel(',int2str(j),',',int2str(indPC),')"'];
0417                             end
0418                         end
0419                     end
0420                 else
0421                     % When the user user strategy is equal to 1, you must
0422                     % do PRCDirSnapshot here to to avoid problems of
0423                     % synchronization.
0424                     
0425                     if isempty(PRCDirSnapshot{indPC}),
0426                         PRCDirSnapshot(indPC)=dynareParallelSnapshot(PRCDir,Parallel(indPC));
0427                         PRCDirSnapshotInit(indPC) = PRCDirSnapshot(indPC);
0428                     else
0429                         PRCDirSnapshot(indPC)=dynareParallelGetNewFiles(PRCDir,Parallel(indPC),PRCDirSnapshot(indPC));
0430                     end
0431                     dynareParallelSendFiles(['slaveJob',int2str(j),'.mat'],PRCDir,Parallel(indPC));
0432                     delete(['slaveJob',int2str(j),'.mat']);
0433                     
0434                 end
0435             end
0436             
0437     end
0438     
0439     fprintf(fid,'%s\n',command1);
0440     
0441 end
0442 
0443 % In This way we are sure that the file 'ConcurrentCommand1.bat' is
0444 % closed and then it can be deleted!
0445 while (1)
0446     StatusOfCC1_bat = fclose(fid);
0447     if StatusOfCC1_bat==0
0448         break
0449     end
0450 end
0451 % Snapshot  of the contents of all the directories involved in parallel
0452 % computing. This is necessary when I want to copy continuously the files produced by
0453 % the slaves ...
0454 % If the compuation is 'Local' it is not necessary to do it ...
0455 
0456 if Strategy==0 || newInstance, % See above.
0457     PRCDirSnapshot=dynareParallelSnapshot(PRCDir,Parallel(1:totSlaves));
0458     PRCDirSnapshotInit = PRCDirSnapshot;
0459     
0460     % Run the slaves.
0461     if  ~ispc, %isunix || (~matlab_ver_less_than('7.4') && ismac),
0462         system('sh ConcurrentCommand1.bat &');
0463         pause(1)
0464     else
0465         
0466         if exist('OCTAVE_VERSION')
0467             % Redirect the standard output to the file 'OctaveStandardOutputMessage.txt'!
0468             % This file is saved in the Model directory.
0469             system('ConcurrentCommand1.bat > OctaveStandardOutputMessage.txt');
0470         else
0471             system('ConcurrentCommand1.bat');
0472         end
0473     end
0474 end
0475 
0476 
0477 % For matlab enviroment with options_.console_mode = 0:
0478 % create a parallel (local/remote) specialized computational status bars!
0479 
0480 global options_
0481 
0482 
0483 % Create a parallel (local/remote) specialized computational status bars!
0484 
0485 if exist('OCTAVE_VERSION') || (options_.console_mode == 1),
0486     diary off;
0487     if exist('OCTAVE_VERSION')
0488         printf('\n');
0489     else
0490         fprintf('\n');
0491     end
0492 else
0493     hfigstatus = figure('name',['Parallel ',fname],...
0494         'DockControls','off', ...
0495         'IntegerHandle','off', ...
0496         'Interruptible','off', ...
0497         'MenuBar', 'none', ...
0498         'NumberTitle','off', ...
0499         'Renderer','Painters', ...
0500         'Resize','off');
0501     
0502     vspace = 0.1;
0503     ncol = ceil(totCPU/10);
0504     hspace = 0.9/ncol;
0505     hstatus(1) = axes('position',[0.05/ncol 0.92 0.9/ncol 0.03], ...
0506         'box','on','xtick',[],'ytick',[],'xlim',[0 1],'ylim',[0 1]);
0507     set(hstatus(1),'Units','pixels')
0508     hpixel = get(hstatus(1),'Position');
0509     hfigure = get(hfigstatus,'Position');
0510     hfigure(4)=hpixel(4)*10/3*min(10,totCPU);
0511     set(hfigstatus,'Position',hfigure)
0512     set(hstatus(1),'Units','normalized'),
0513     vspace = max(0.1,1/totCPU);
0514     vstart = 1-vspace+0.2*vspace;
0515     for j=1:totCPU,
0516         jrow = mod(j-1,10)+1;
0517         jcol = ceil(j/10);
0518         hstatus(j) = axes('position',[0.05/ncol+(jcol-1)/ncol vstart-vspace*(jrow-1) 0.9/ncol 0.3*vspace], ...
0519             'box','on','xtick',[],'ytick',[],'xlim',[0 1],'ylim',[0 1]);
0520         hpat(j) = patch([0 0 0 0],[0 1 1 0],'r','EdgeColor','r');
0521         htit(j) = title(['Initialize ...']);
0522         
0523     end
0524     
0525     cumBlockPerCPU = cumsum(nBlockPerCPU);
0526 end
0527 pcerdone = NaN(1,totCPU);
0528 idCPU = NaN(1,totCPU);
0529 
0530 delete(['comp_status_',fname,'*.mat']);
0531 
0532 
0533 % Wait for the slaves to finish their job, and display some progress
0534 % information meanwhile.
0535 
0536 % Caption for console mode computing ...
0537 
0538 if (options_.console_mode == 1) ||  exist('OCTAVE_VERSION')
0539     
0540     if ~exist('OCTAVE_VERSION')
0541         if strcmp([Parallel(indPC).MatlabOctavePath], 'octave')
0542             RjInformation='Hybrid Computing Is Active: Remote jobs are computed by Octave!';
0543             fprintf([RjInformation,'\n\n']);
0544         end
0545     end
0546     
0547     fnameTemp=fname;
0548     
0549     L=length(fnameTemp);
0550     
0551     PoCo=strfind(fnameTemp,'_core');
0552     
0553     for i=PoCo:L
0554         if i==PoCo
0555             fnameTemp(i)=' ';
0556         else
0557             fnameTemp(i)='.';
0558         end
0559     end
0560     
0561     for i=1:L
0562         if  fnameTemp(i)=='_';
0563             fnameTemp(i)=' ';
0564         end
0565     end
0566     
0567     fnameTemp(L)='';
0568     
0569     Information=['Parallel ' fnameTemp ' Computing ...'];
0570     if exist('OCTAVE_VERSION')
0571         if (~ispc || strcmpi('unix',Parallel(indPC).OperatingSystem)) && (Strategy==0)
0572             printf('\n');
0573             pause(2);
0574         end
0575         
0576         printf([Information,'\n\n']);
0577     else
0578         fprintf([Information,'\n\n']);
0579     end
0580     
0581 end
0582 
0583 
0584 % Testing Zone
0585 
0586 % Check the new copy file strategy ...
0587 global NuoviFilecopiati
0588 NuoviFilecopiati=zeros(1,totSlaves);
0589 % End
0590 
0591 ForEver=1;
0592 statusString = '';
0593 flag_CloseAllSlaves=0;
0594 
0595 while (ForEver)
0596     
0597     waitbarString = '';
0598     statusString0 = repmat('\b',1,length(sprintf(statusString, 100 .* pcerdone)));
0599     statusString = '';
0600     
0601     pause(1)
0602     
0603     try
0604         if islocal ==0,
0605             dynareParallelGetFiles(['comp_status_',fname,'*.mat'],PRCDir,Parallel(1:totSlaves));
0606         end
0607     catch
0608     end
0609     
0610     for j=1:totCPU,
0611         try
0612             if ~isempty(['comp_status_',fname,int2str(j),'.mat'])
0613                 load(['comp_status_',fname,int2str(j),'.mat']);
0614 %                 whoCloseAllSlaves = who(['comp_status_',fname,int2str(j),'.mat','CloseAllSlaves']);
0615                 if exist('CloseAllSlaves') && flag_CloseAllSlaves==0,
0616                     flag_CloseAllSlaves=1;
0617                     whoiamCloseAllSlaves=j;
0618                     closeSlave(Parallel(1:totSlaves),PRCDir,1);
0619                 end
0620             end
0621             pcerdone(j) = prtfrc;
0622             idCPU(j) = njob;
0623             if exist('OCTAVE_VERSION') || (options_.console_mode == 1),
0624                 if (~ispc || strcmpi('unix',Parallel(indPC).OperatingSystem))
0625                     statusString = [statusString, int2str(j), ' %3.f%% done! '];
0626                 else
0627                     statusString = [statusString, int2str(j), ' %3.f%% done! '];
0628                 end
0629             else
0630                 status_String{j} = waitbarString;
0631                 status_Title{j} = waitbarTitle;
0632             end
0633         catch % ME
0634             % To define!
0635             if exist('OCTAVE_VERSION') || (options_.console_mode == 1),
0636                 if (~ispc || strcmpi('unix',Parallel(indPC).OperatingSystem))
0637                     statusString = [statusString, int2str(j), ' %3.f%% done! '];
0638                 else
0639                     statusString = [statusString, int2str(j), ' %3.f%% done! '];
0640                 end
0641             end
0642         end
0643     end
0644     if exist('OCTAVE_VERSION') || (options_.console_mode == 1),
0645         if exist('OCTAVE_VERSION')
0646             printf([statusString,'\r'], 100 .* pcerdone);
0647         else
0648             if ~isempty(statusString)
0649                 fprintf([statusString0,statusString], 100 .* pcerdone);
0650             end
0651         end
0652         
0653     else
0654         for j=1:totCPU,
0655             try
0656                 set(hpat(j),'XData',[0 0 pcerdone(j) pcerdone(j)]);
0657                 set(htit(j),'String',[status_Title{j},' - ',status_String{j}]);
0658             catch
0659                 
0660             end
0661         end
0662     end
0663     
0664     % Check if the slave(s) has generated some new files remotely.
0665     % 1. The files .log and .txt are not copied.
0666     % 2. The comp_status_*.mat files are managed separately.
0667     
0668     PRCDirSnapshot=dynareParallelGetNewFiles(PRCDir,Parallel(1:totSlaves),PRCDirSnapshot);
0669     
0670     if isempty(dynareParallelDir(['P_',fname,'_*End.txt'],PRCDir,Parallel(1:totSlaves)));
0671         HoTuttiGliOutput=0;
0672         for j=1:totCPU,
0673             
0674             % Checking if the remote computation is finished and if we copied all the output here.
0675             if ~isempty(dir([fname,'_output_',int2str(j),'.mat']))
0676                 HoTuttiGliOutput=HoTuttiGliOutput+1;
0677             end
0678         end
0679         
0680         if HoTuttiGliOutput==totCPU,
0681             mydelete(['comp_status_',fname,'*.mat']);
0682             if exist('OCTAVE_VERSION')|| (options_.console_mode == 1),
0683                 if exist('OCTAVE_VERSION')
0684                     printf('\n');
0685                     printf(['End Parallel Session ....','\n\n']);
0686                 else
0687                     fprintf('\n');
0688                     fprintf(['End Parallel Session ....','\n\n']);
0689                 end
0690                 diary on;
0691             else
0692                 close(hfigstatus),
0693             end
0694             
0695             break
0696         else
0697             disp('Waiting for output files from slaves ...')
0698         end
0699     end
0700     
0701 end
0702 
0703 
0704 % Load and format remote output.
0705 iscrash = 0;
0706 
0707 for j=1:totCPU,
0708     indPC=min(find(nCPU>=j));
0709     %   Already done above.
0710     %   dynareParallelGetFiles([fname,'_output_',int2str(j),'.mat'],PRCDir,Parallel(indPC));
0711     load([fname,'_output_',int2str(j),'.mat'],'fOutputVar');
0712     delete([fname,'_output_',int2str(j),'.mat']);
0713     if isfield(fOutputVar,'OutputFileName'),
0714         %   Already done above
0715         %   dynareParallelGetFiles([fOutputVar.OutputFileName],PRCDir,Parallel(indPC));
0716     end
0717     if isfield(fOutputVar,'error'),
0718         disp(['Job number ',int2str(j),' crashed with error:']);
0719         iscrash=1;
0720         disp([fOutputVar.error.message]);
0721         for jstack=1:length(fOutputVar.error.stack)
0722             fOutputVar.error.stack(jstack),
0723         end
0724     elseif flag_CloseAllSlaves==0,
0725         fOutVar(j)=fOutputVar;
0726     elseif j==whoiamCloseAllSlaves,
0727         fOutVar=fOutputVar;        
0728     end
0729 end
0730 
0731 if flag_CloseAllSlaves==1,
0732     closeSlave(Parallel(1:totSlaves),PRCDir,-1);
0733 end
0734 
0735 if iscrash,
0736     error('Remote jobs crashed');
0737 end
0738 
0739 pause(1), % Wait for all remote diary off completed
0740 
0741 % Cleanup.
0742 dynareParallelGetFiles('*.log',PRCDir,Parallel(1:totSlaves));
0743 
0744 switch Strategy
0745     case 0
0746         for indPC=1:length(Parallel)
0747             if Parallel(indPC).Local == 0
0748                 dynareParallelRmDir(PRCDir,Parallel(indPC));
0749             end
0750             
0751             if isempty(dir('dynareParallelLogFiles'))
0752                 [A B C]=rmdir('dynareParallelLogFiles');
0753                 mkdir('dynareParallelLogFiles');
0754             end
0755             try
0756                 copyfile('*.log','dynareParallelLogFiles');
0757                 mydelete([fname,'*.log']);
0758             catch
0759             end
0760             mydelete(['*_core*_input*.mat']);
0761             %             if Parallel(indPC).Local == 1
0762             %                 delete(['slaveParallel_input*.mat']);
0763             %             end
0764             
0765         end
0766         
0767         delete ConcurrentCommand1.bat
0768     case 1
0769         delete(['temp_input.mat'])
0770         if newInstance,
0771             if isempty(dir('dynareParallelLogFiles'))
0772                 [A B C]=rmdir('dynareParallelLogFiles');
0773                 mkdir('dynareParallelLogFiles');
0774             end
0775         end
0776         copyfile('*.log','dynareParallelLogFiles');
0777         if newInstance,
0778             delete ConcurrentCommand1.bat
0779         end
0780 end
0781 
0782 
0783 
0784

Generated on Tue 22-May-2012 02:40:23 by m2html © 2005