Package edu.isi.pegasus.planner.refiner
Class TransferEngine
- java.lang.Object
-
- edu.isi.pegasus.planner.refiner.Engine
-
- edu.isi.pegasus.planner.refiner.TransferEngine
-
public class TransferEngine extends Engine
The transfer engine, which on the basis of the pools on which the jobs are to run, adds nodes to transfer the data products.- Version:
- $Revision$
- Author:
- Karan Vahi, Gaurang Mehta
-
-
Field Summary
Fields Modifier and Type Field Description static int
DELETED_JOBS_LEVEL
The MAX level is assigned as the level for deleted jobs.private boolean
mBypassStagingForInputs
A boolean indicating whether to bypass first level staging for inputsprivate ADag
mDag
The DAG object to which the transfer nodes are to be added.private java.util.List<Job>
mDeletedJobs
Holds all the jobs deleted by the reduction algorithm.private org.griphyn.vdl.euryale.FileFactory
mFactory
The handle to the file factory, that is used to create the top level directories for each of the partitions.private OutputMapper
mOutputMapper
Handle to an OutputMapper that tells whatprivate java.lang.String
mOutputSite
The output site where files need to be staged to.private PlannerCache
mPlannerCache
A SimpleFile Replica Catalog, that tracks all the files that are being materialized as part of workflow executaion.private PlannerOptions
mPlannerOptions
The planner options passed to the plannerprivate ReplicaCatalogBridge
mRCBridge
The bridge to the Replica Catalog.private ReplicaSelector
mReplicaSelector
The handle to the replica selector that is to used to select the various replicas.private boolean
mSetupForCondorIO
A boolean to track whether condor file io is used for the workflow or not.private java.util.Map<java.lang.String,NameValue>
mSRMServiceURLToMountPointMap
A map that associates the site name with the SRM server url and mount point.private Refiner
mTXRefiner
The handle to the transfer refiner that adds the transfer nodes into the workflow.protected boolean
mUseSymLinks
This member variable if set causes the destination URL for the symlink jobs to have symlink:// url if the pool attributed associated with the pfn is same as a particular jobs execution pool.protected java.lang.String
mWorkDir
The working directory relative to the mount point of the execution pool.private boolean
mWorkerNodeExecution
A boolean indicating whether we are doing worker node execution or not.private ReplicaCatalog
mWorkflowCache
A Replica Catalog, that tracks all the GET URL's for the files on the staging sites.static java.lang.String
REFINER_NAME
The name of the refiner for purposes of error loggingstatic java.lang.String
SRM_MOUNT_POINT_PROPERTIES_SUFFIX
The suffix to retrive the mount point for SRM server.static java.lang.String
SRM_PROPERTIES_PREFIX
The property prefix for retrieving SRM properties.static java.lang.String
SRM_SERVICE_URL_PROPERTIES_SUFFIX
The suffix to retrive the service url for SRM server.static java.lang.String
WORKFLOW_CACHE_FILE_IMPLEMENTOR
The name of the Replica Catalog Implementer that is used to write out the workflow cache file in the submit directory.static java.lang.String
WORKFLOW_CACHE_REPLICA_CATALOG_KEY
The name of the source key for Replica Catalog Implementer that serves as cache-
Fields inherited from class edu.isi.pegasus.planner.refiner.Engine
mBag, mLogger, mLogMsg, mOutputPool, mPoolFile, mPOptions, mProps, mRLIUrl, mSiteStore, mTCFile, mTCHandle, mTCMode, REGISTRATION_UNIVERSE, TRANSFER_UNIVERSE
-
-
Constructor Summary
Constructors Constructor Description TransferEngine(ADag reducedDag, PegasusBag bag, java.util.List<Job> deletedJobs, java.util.List<Job> deletedLeafJobs)
Overloaded constructor.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
addTransferNodes(ReplicaCatalogBridge rcb, PlannerCache plannerCache)
Adds the transfer nodes to the workflow.private boolean
bypassStagingForInputFile(ReplicaCatalogEntry entry, PegasusFile file, java.lang.String computeSite)
Returns a boolean indicating whether to bypass first level staging for a file or notprivate void
complainForScratchFileServer(Job job, FileServerType.OPERATION operation, java.lang.String site)
Complains for a missing head node file server on a site for a jobprivate void
complainForScratchFileServer(java.lang.String jobname, FileServerType.OPERATION operation, java.lang.String site)
Complains for a missing head node file server on a site for a jobprivate FileTransfer
constructFileTX(PegasusFile pf, Job job, java.lang.String destSiteHandle, java.lang.String path, boolean localTransfer)
Constructs the FileTransfer object on the basis of the transiency information.private java.lang.String
constructRegistrationURL(java.lang.String site, java.lang.String lfn)
Constructs a Registration URL for a LFNprivate java.util.Map<java.lang.String,NameValue>
constructSiteToSRMServerMap(PegasusProperties props)
Constructs a Properties objects by parsing the relevant SRM pegasus properties.private java.lang.String
getCacheFileName(ADag adag)
Constructs the basename to the cache file that is to be used to log the transient files.private java.util.Vector
getDeletedFileTX(java.lang.String pool, Job job)
This gets the file transfer objects corresponding to the location of files found in the replica mechanism, and transfers it to the output pool asked by the user.private void
getFilesFromRC(DAGJob job, java.util.Collection searchFiles)
Special Handling for a DAGJob for retrieving files from the Replica Catalog.private void
getFilesFromRC(DAXJob job, java.util.Collection searchFiles)
Special Handling for a DAXJob for retrieving files from the Replica Catalog.private void
getFilesFromRC(Job job, java.util.Collection searchFiles)
It looks up the RCEngine Hashtable to lookup the locations for the files and add nodes to transfer them.private java.util.Vector
getFileTX(java.lang.String destPool, Job job, boolean localTransfer)
This gets the Vector of FileTransfer objects for the files which have to be transferred to an one destination pool.private java.util.Collection<FileTransfer>[]
getInterpoolFileTX(Job job, java.util.Collection<GraphNode> parents)
This gets the Vector of FileTransfer objects for all the files which have to be transferred to the destination pool in case of Interpool transfers.private java.util.Set<PegasusFile>
getOutputFiles(java.util.Collection<GraphNode> nodes)
It gets the output files for all the nodes which are specified in the nodes passed.java.lang.String
getStagingSite(Job job)
Returns the staging site to be used for a job.private java.lang.String
getURLOnSharedScratch(SiteCatalogEntry entry, Job job, FileServerType.OPERATION operation, java.lang.String lfn)
Returns a URL on the shared scratch of the staging siteprivate ReplicaCatalog
initializeWorkflowCacheFile(ADag dag)
Initializes a Replica Catalog Instance that is used to store the GET URL's for all files on the staging site ( inputs staged and outputs created ).private void
logRemoval(Job job, PegasusFile file, java.lang.String prefix, boolean removed)
Helped method for logging removal message.private java.lang.String
poolNotFoundMsg(java.lang.String poolName, java.lang.String universe)
This generates a error message for pool not found in the pool config file.private void
processParents(Job job, java.util.Collection<GraphNode> parents)
It processes a nodes parents and determines if nodes are to be added or not.protected java.lang.String
replaceProtocolFromURL(java.lang.String pfn)
Replaces the gsiftp URL scheme from the url, and replaces it with the symlink url scheme and returns in a new object.protected ReplicaCatalogEntry
replaceSourceProtocolFromURL(ReplicaCatalogEntry rce)
Replaces the SRM URL scheme from the url, and replaces it with the file url scheme and returns in a new object if replacement happens.boolean
runTransferOnLocalSite(java.lang.String site, java.lang.String destinationURL, int type)
Returns whether to run a transfer job on local site or not.private void
trackInCaches(Job job)
Tracks the files created by a job in the both the planner and workflow cache The planner cache stores the put URL's and the GET URL is stored in the workflow cache.private void
trackInPlannerCache(java.lang.String lfn, java.lang.String pfn, java.lang.String site)
Inserts an entry into the planner cache as a put URL.private void
trackInPlannerCache(java.lang.String lfn, java.lang.String pfn, java.lang.String site, FileServerType.OPERATION type)
Inserts an entry into the planner cache as a put URL.private void
trackInWorkflowCache(java.lang.String lfn, java.lang.String pfn, java.lang.String site)
Inserts an entry into the workflow cache that is to be written out to the submit directory.-
Methods inherited from class edu.isi.pegasus.planner.refiner.Engine
addVector, appendArrayList, complainForHeadNodeURLPrefix, complainForHeadNodeURLPrefix, loadProperties, printVector, stringInList, stringInPegVector, stringInVector, vectorToString
-
-
-
-
Field Detail
-
DELETED_JOBS_LEVEL
public static final int DELETED_JOBS_LEVEL
The MAX level is assigned as the level for deleted jobs. We can put it to Integer.MAX_VALUE, but it is rare that number of levels in a workflows exceed 1000.- See Also:
- Constant Field Values
-
WORKFLOW_CACHE_FILE_IMPLEMENTOR
public static final java.lang.String WORKFLOW_CACHE_FILE_IMPLEMENTOR
The name of the Replica Catalog Implementer that is used to write out the workflow cache file in the submit directory.- See Also:
- Constant Field Values
-
WORKFLOW_CACHE_REPLICA_CATALOG_KEY
public static final java.lang.String WORKFLOW_CACHE_REPLICA_CATALOG_KEY
The name of the source key for Replica Catalog Implementer that serves as cache- See Also:
- Constant Field Values
-
SRM_PROPERTIES_PREFIX
public static final java.lang.String SRM_PROPERTIES_PREFIX
The property prefix for retrieving SRM properties.- See Also:
- Constant Field Values
-
SRM_SERVICE_URL_PROPERTIES_SUFFIX
public static final java.lang.String SRM_SERVICE_URL_PROPERTIES_SUFFIX
The suffix to retrive the service url for SRM server.- See Also:
- Constant Field Values
-
SRM_MOUNT_POINT_PROPERTIES_SUFFIX
public static final java.lang.String SRM_MOUNT_POINT_PROPERTIES_SUFFIX
The suffix to retrive the mount point for SRM server.- See Also:
- Constant Field Values
-
REFINER_NAME
public static final java.lang.String REFINER_NAME
The name of the refiner for purposes of error logging- See Also:
- Constant Field Values
-
mSRMServiceURLToMountPointMap
private java.util.Map<java.lang.String,NameValue> mSRMServiceURLToMountPointMap
A map that associates the site name with the SRM server url and mount point.
-
mDag
private ADag mDag
The DAG object to which the transfer nodes are to be added. This is the reduced Dag, which is got from the Reduction Engine.
-
mRCBridge
private ReplicaCatalogBridge mRCBridge
The bridge to the Replica Catalog.
-
mReplicaSelector
private ReplicaSelector mReplicaSelector
The handle to the replica selector that is to used to select the various replicas.
-
mTXRefiner
private Refiner mTXRefiner
The handle to the transfer refiner that adds the transfer nodes into the workflow.
-
mDeletedJobs
private java.util.List<Job> mDeletedJobs
Holds all the jobs deleted by the reduction algorithm.
-
mPlannerCache
private PlannerCache mPlannerCache
A SimpleFile Replica Catalog, that tracks all the files that are being materialized as part of workflow executaion.
-
mWorkflowCache
private ReplicaCatalog mWorkflowCache
A Replica Catalog, that tracks all the GET URL's for the files on the staging sites.
-
mFactory
private org.griphyn.vdl.euryale.FileFactory mFactory
The handle to the file factory, that is used to create the top level directories for each of the partitions.
-
mOutputMapper
private OutputMapper mOutputMapper
Handle to an OutputMapper that tells what
-
mWorkDir
protected java.lang.String mWorkDir
The working directory relative to the mount point of the execution pool. It is populated from the pegasus.dir.exec property from the properties file. If not specified then it work_dir is supposed to be the exec mount point of the execution pool.
-
mUseSymLinks
protected boolean mUseSymLinks
This member variable if set causes the destination URL for the symlink jobs to have symlink:// url if the pool attributed associated with the pfn is same as a particular jobs execution pool.
-
mWorkerNodeExecution
private boolean mWorkerNodeExecution
A boolean indicating whether we are doing worker node execution or not.
-
mPlannerOptions
private PlannerOptions mPlannerOptions
The planner options passed to the planner
-
mBypassStagingForInputs
private boolean mBypassStagingForInputs
A boolean indicating whether to bypass first level staging for inputs
-
mSetupForCondorIO
private final boolean mSetupForCondorIO
A boolean to track whether condor file io is used for the workflow or not.
-
mOutputSite
private final java.lang.String mOutputSite
The output site where files need to be staged to.
-
-
Constructor Detail
-
TransferEngine
public TransferEngine(ADag reducedDag, PegasusBag bag, java.util.List<Job> deletedJobs, java.util.List<Job> deletedLeafJobs)
Overloaded constructor.- Parameters:
reducedDag
- the reduced workflow.bag
- bag of initialization objectsdeletedJobs
- list of all jobs deleted by reduction algorithm.deletedLeafJobs
- list of deleted leaf jobs by reduction algorithm.
-
-
Method Detail
-
runTransferOnLocalSite
public boolean runTransferOnLocalSite(java.lang.String site, java.lang.String destinationURL, int type)
Returns whether to run a transfer job on local site or not.- Parameters:
site
- the site handle associated with the destination URL.destPutURL
- the destination URLtype
- the type of transfer job for which the URL is being constructed.- Returns:
- true indicating if the associated transfer job should run on local site or not.
-
addTransferNodes
public void addTransferNodes(ReplicaCatalogBridge rcb, PlannerCache plannerCache)
Adds the transfer nodes to the workflow.- Parameters:
rcb
- the bridge to the ReplicaCatalog.plannerCache
- an instance of the replica catalog that will store the locations of the files on the remote sites.
-
getStagingSite
public java.lang.String getStagingSite(Job job)
Returns the staging site to be used for a job. If a staging site is not determined from the options it is set to be the execution site for the job- Parameters:
job
- the job for which to determine the staging site- Returns:
- the staging site
-
getDeletedFileTX
private java.util.Vector getDeletedFileTX(java.lang.String pool, Job job)
This gets the file transfer objects corresponding to the location of files found in the replica mechanism, and transfers it to the output pool asked by the user. If the output pool path and the one returned by the replica mechanism match then that object is not transferred.- Parameters:
pool
- this the output pool which the user specifies at runtime.job
- The Job object corresponding to the leaf job which was deleted by the Reduction algorithm- Returns:
- Vector of
FileTransfer
objects
-
processParents
private void processParents(Job job, java.util.Collection<GraphNode> parents)
It processes a nodes parents and determines if nodes are to be added or not. All the input files for the job are searched in the output files of the parent nodes and the Replica Mechanism.- Parameters:
job
- theJob
object containing all the details of the job.parents
- listGraphNode
ojbects corresponding to the parent jobs of the job.
-
getFileTX
private java.util.Vector getFileTX(java.lang.String destPool, Job job, boolean localTransfer)
This gets the Vector of FileTransfer objects for the files which have to be transferred to an one destination pool. It checks for the transient flags for files. If the transfer transient flag is set, it means the file does not have to be transferred to the destination pool.- Parameters:
destSiteHandle
- The pool to which the files are to be transferred to.job
- TheJob
object of the job whose output files are needed at the destination pool.localTransfer
- boolean indicating that associated transfer job will run on local site.- Returns:
- Vector of
FileTransfer
objects
-
constructFileTX
private FileTransfer constructFileTX(PegasusFile pf, Job job, java.lang.String destSiteHandle, java.lang.String path, boolean localTransfer)
Constructs the FileTransfer object on the basis of the transiency information. If the transient flag for transfer is set, the destPutURL for the FileTransfer object would be the execution directory, as this is the entry that has to be registered in the ReplicaMechanism- Parameters:
pf
- the PegasusFile for which the transfer has to be done.stagingSiteHandle
- the staging site at which file is placed after execution.destSiteHandle
- the output pool where the job should be transferredjob
- the name of the associated job.path
- the path that a user specifies in the profile for key remote_initialdir that results in the workdir being changed for a job on a execution pool.localTransfer
- boolean indicating that associated transfer job will run on local site.- Returns:
- the corresponding FileTransfer object
-
constructRegistrationURL
private java.lang.String constructRegistrationURL(java.lang.String site, java.lang.String lfn)
Constructs a Registration URL for a LFN- Parameters:
site
- the site handlelfn
- the LFN for which the URL needs to be constructed- Returns:
- the URL
-
poolNotFoundMsg
private java.lang.String poolNotFoundMsg(java.lang.String poolName, java.lang.String universe)
This generates a error message for pool not found in the pool config file.- Parameters:
poolName
- the name of pool that is not found.universe
- the condor universe- Returns:
- the message.
-
getInterpoolFileTX
private java.util.Collection<FileTransfer>[] getInterpoolFileTX(Job job, java.util.Collection<GraphNode> parents)
This gets the Vector of FileTransfer objects for all the files which have to be transferred to the destination pool in case of Interpool transfers. Each FileTransfer object has the source and the destination URLs. the source URI is determined from the pool on which the jobs are executed.- Parameters:
job
- the job with reference to which interpool file transfers need to be determined.parents
- Collection ofGraphNode
ojbects corresponding to the parent jobs of the job.- Returns:
- array of Collection of
FileTransfer
objects
-
getFilesFromRC
private void getFilesFromRC(DAGJob job, java.util.Collection searchFiles)
Special Handling for a DAGJob for retrieving files from the Replica Catalog.- Parameters:
job
- the DAGJobsearchFiles
- file that need to be looked in the Replica Catalog.
-
getFilesFromRC
private void getFilesFromRC(DAXJob job, java.util.Collection searchFiles)
Special Handling for a DAXJob for retrieving files from the Replica Catalog.- Parameters:
job
- the DAXJobsearchFiles
- file that need to be looked in the Replica Catalog.
-
getFilesFromRC
private void getFilesFromRC(Job job, java.util.Collection searchFiles)
It looks up the RCEngine Hashtable to lookup the locations for the files and add nodes to transfer them. If a file is not found to be in the Replica Catalog the Transfer Engine flags an error and exits- Parameters:
job
- theJob
object for whose ipfile have to search the Replica Mechanism for.searchFiles
- Vector containing the PegasusFile objects corresponding to the files that need to have their mapping looked up from the Replica Mechanism.
-
replaceSourceProtocolFromURL
protected ReplicaCatalogEntry replaceSourceProtocolFromURL(ReplicaCatalogEntry rce)
Replaces the SRM URL scheme from the url, and replaces it with the file url scheme and returns in a new object if replacement happens. The original object passed as a parameter still remains the same.- Parameters:
rce
- theReplicaCatalogEntry
object whose url need to be replaced.- Returns:
- the object with the url replaced.
-
replaceProtocolFromURL
protected java.lang.String replaceProtocolFromURL(java.lang.String pfn)
Replaces the gsiftp URL scheme from the url, and replaces it with the symlink url scheme and returns in a new object. The original object passed as a parameter still remains the same.- Parameters:
pfn
- the pfn that needs to be replaced- Returns:
- the replaced PFN
-
constructSiteToSRMServerMap
private java.util.Map<java.lang.String,NameValue> constructSiteToSRMServerMap(PegasusProperties props)
Constructs a Properties objects by parsing the relevant SRM pegasus properties. For example, if users have the following specified in properties filepegasus.transfer.srm.ligo-cit.service.url srm://osg-se.ligo.caltech.edu:10443/srm/v2/server?SFN=/mnt/hadoop pegasus.transfer.srm.ligo-cit.service.mountpoint /mnt/hadoop
then, a Map is create the associates ligo-cit with NameValue object containing the service url and mount point ( ).- Parameters:
props
- thePegasusProperties
object- Returns:
- Map that maps a site name to a NameValue object that has the URL prefix and the mount point
-
getOutputFiles
private java.util.Set<PegasusFile> getOutputFiles(java.util.Collection<GraphNode> nodes)
It gets the output files for all the nodes which are specified in the nodes passed.- Parameters:
nodes
- Listcontaining the jobs - Returns:
- Set of PegasusFile objects
-
trackInCaches
private void trackInCaches(Job job)
Tracks the files created by a job in the both the planner and workflow cache The planner cache stores the put URL's and the GET URL is stored in the workflow cache.- Parameters:
job
- the job whose input files need to be tracked.
-
trackInPlannerCache
private void trackInPlannerCache(java.lang.String lfn, java.lang.String pfn, java.lang.String site)
Inserts an entry into the planner cache as a put URL.- Parameters:
lfn
- the logical name of the file.pfn
- the pfnsite
- the site handle
-
trackInPlannerCache
private void trackInPlannerCache(java.lang.String lfn, java.lang.String pfn, java.lang.String site, FileServerType.OPERATION type)
Inserts an entry into the planner cache as a put URL.- Parameters:
lfn
- the logical name of the file.pfn
- the pfnsite
- the site handletype
- the type of url
-
trackInWorkflowCache
private void trackInWorkflowCache(java.lang.String lfn, java.lang.String pfn, java.lang.String site)
Inserts an entry into the workflow cache that is to be written out to the submit directory.- Parameters:
lfn
- the logical name of the file.pfn
- the pfnsite
- the site handle
-
getURLOnSharedScratch
private java.lang.String getURLOnSharedScratch(SiteCatalogEntry entry, Job job, FileServerType.OPERATION operation, java.lang.String lfn)
Returns a URL on the shared scratch of the staging site- Parameters:
entry
- the SiteCatalogEntry for the associated stagingsitejob
- the joboperation
- the FileServer operation for which we need the URLlfn
- the LFN can be null to get the path to the directory- Returns:
- the URL
-
complainForScratchFileServer
private void complainForScratchFileServer(Job job, FileServerType.OPERATION operation, java.lang.String site)
Complains for a missing head node file server on a site for a job- Parameters:
job
- the joboperation
- the operationsite
- the site
-
complainForScratchFileServer
private void complainForScratchFileServer(java.lang.String jobname, FileServerType.OPERATION operation, java.lang.String site)
Complains for a missing head node file server on a site for a job- Parameters:
jobname
- the name of the joboperation
- the file server operationsite
- the site
-
initializeWorkflowCacheFile
private ReplicaCatalog initializeWorkflowCacheFile(ADag dag)
Initializes a Replica Catalog Instance that is used to store the GET URL's for all files on the staging site ( inputs staged and outputs created ).- Parameters:
dag
- the workflow being planned- Returns:
- handle to transient catalog
-
getCacheFileName
private java.lang.String getCacheFileName(ADag adag)
Constructs the basename to the cache file that is to be used to log the transient files. The basename is dependant on whether the basename prefix has been specified at runtime or not.- Parameters:
adag
- the ADag object containing the workflow that is being concretized.- Returns:
- the name of the cache file
-
bypassStagingForInputFile
private boolean bypassStagingForInputFile(ReplicaCatalogEntry entry, PegasusFile file, java.lang.String computeSite)
Returns a boolean indicating whether to bypass first level staging for a file or not- Parameters:
entry
- a ReplicaCatalogEntry matching the selected replica location.file
- the corresponding Pegasus File objectcomputeSite
- the compute site where the associated job will run.isExecutable
- whether the file transferred is an executable file or not- Returns:
- boolean indicating whether we need to enable bypass or not
-
logRemoval
private void logRemoval(Job job, PegasusFile file, java.lang.String prefix, boolean removed)
Helped method for logging removal message. If removed is true, then logged on debug else logged as warning.- Parameters:
job
- the jobfile
- the file to be removedprefix
- prefix for log messageremoved
- whether removal was successful or not.
-
-