"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "clc/modules/object-storage/src/main/java/com/eucalyptus/objectstorage/asynctask/BucketReaperTask.java" between
eucalyptus-4.4.1.tar.gz and eucalyptus-4.4.2.tar.gz

About: Eucalyptus (Elastic Utility Computing Architecture for Linking Your Programs To Useful Systems) is an infrastructure for implementing "cloud computing" on clusters (compatible with Amazon’s EC2 interface, but designed to support multiple client-side interfaces).

BucketReaperTask.java  (eucalyptus-4.4.1):BucketReaperTask.java  (eucalyptus-4.4.2)
skipping to change at line 25 skipping to change at line 25
* *
* Please contact Eucalyptus Systems, Inc., 6755 Hollister Ave., Goleta * Please contact Eucalyptus Systems, Inc., 6755 Hollister Ave., Goleta
* CA 93117, USA or visit http://www.eucalyptus.com/licenses/ if you need * CA 93117, USA or visit http://www.eucalyptus.com/licenses/ if you need
* additional information or have any questions. * additional information or have any questions.
*/ */
package com.eucalyptus.objectstorage.asynctask; package com.eucalyptus.objectstorage.asynctask;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Level;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import com.eucalyptus.objectstorage.BucketMetadataManagers; import com.eucalyptus.objectstorage.BucketMetadataManagers;
import com.eucalyptus.objectstorage.BucketState; import com.eucalyptus.objectstorage.BucketState;
import com.eucalyptus.objectstorage.ObjectMetadataManagers; import com.eucalyptus.objectstorage.ObjectMetadataManagers;
import com.eucalyptus.objectstorage.OsgBucketFactory; import com.eucalyptus.objectstorage.OsgBucketFactory;
import com.eucalyptus.objectstorage.PaginatedResult; import com.eucalyptus.objectstorage.PaginatedResult;
import com.eucalyptus.objectstorage.entities.Bucket; import com.eucalyptus.objectstorage.entities.Bucket;
import com.eucalyptus.objectstorage.entities.ObjectEntity; import com.eucalyptus.objectstorage.entities.ObjectEntity;
import com.eucalyptus.objectstorage.entities.ObjectStorageGlobalConfiguration; import com.eucalyptus.objectstorage.entities.ObjectStorageGlobalConfiguration;
import com.eucalyptus.objectstorage.providers.ObjectStorageProviders; import com.eucalyptus.objectstorage.providers.ObjectStorageProviders;
import com.eucalyptus.storage.config.ConfigurationCache; import com.eucalyptus.storage.config.ConfigurationCache;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
/** /**
* Scans metadata for each objects in a bucket and cleans history for each Many * Scans metadata for each objects in a bucket and cleans history for each.
of these may be running concurrently. Has a self-imposed timeout of 30
* seconds.
*
* This should be more than sufficient given that it only manipulates metadata a
nd never interacts with the backend.
*
*/ */
public class BucketReaperTask implements Runnable { public class BucketReaperTask {
private static final Logger LOG = Logger.getLogger(BucketReaperTask.class); private static final Logger LOG = Logger.getLogger(BucketReaperTask.class);
private long startTime; private static final long TASK_INTERVAL = 30 * 1000; // 30 seconds
private static final long MAX_TASK_DURATION = 30 * 1000; // 30 seconds private static final long TIMEOUT_LOG_INTERVAL = TimeUnit.HOURS.toMillis( 1 );
private static final AtomicLong TIMEOUT_LOG_TOKEN = new AtomicLong( );
private static final Random rand = new Random(System.currentTimeMillis()); private static final Random rand = new Random(System.currentTimeMillis());
private boolean interrupted = false;
private volatile long startTime;
private volatile long timeoutTime;
private volatile boolean interrupted = false;
public BucketReaperTask() {} public BucketReaperTask() {}
// Does a single scan of all objects in the bucket and does history cleanup on each // Does a single scan of all objects in the bucket and does history cleanup on each
@Override public void run( final long nextRun ) {
public void run() { startTime = System.currentTimeMillis( );
startTime = System.currentTimeMillis(); timeoutTime = nextRun - TASK_INTERVAL;
Supplier<Level> timeoutLevelSupplier = Suppliers.memoize(() -> logTimeout( )
? Level.WARN : Level.TRACE);
int bucketsResolved = 0;
int bucketsCleaned = 0;
boolean cleaningStarted = false;
try { try {
LOG.trace("Initiating bucket cleanup task"); LOG.trace("Initiating bucket cleanup task");
final List<Bucket> buckets = BucketMetadataManagers.getInstance().lookupBu cketsByState(null); final List<Bucket> buckets = BucketMetadataManagers.getInstance().lookupBu cketsByState(null);
if (buckets == null || buckets.size() <= 0) { if (buckets == null || buckets.size() <= 0) {
LOG.trace("No buckets found to clean. Cleanup task complete"); LOG.trace("No buckets found to clean. Cleanup task complete");
return; return;
} }
// Resolve all bucket states (fast) before cleaning histories (slow, could
time out)
for (Bucket bucket : buckets) {
if (!isTimedOut() && !interrupted) {
resolveBucketState(bucket);
bucketsResolved++;
} else {
LOG.warn("Timed out while cleaning up bucket states after processing "
+ bucketsResolved + " buckets.");
break;
}
}
LOG.trace("Finished resolving " + bucketsResolved + " bucket states.");
// Randomly iterate through the buckets so they all have equal chance of r
unning before a timeout
Bucket b; Bucket b;
// Randomly iterate through
int idx; int idx;
cleaningStarted = true;
while (buckets.size() > 0 && !isTimedOut() && !interrupted) { while (buckets.size() > 0 && !isTimedOut() && !interrupted) {
idx = rand.nextInt(buckets.size()); idx = rand.nextInt(buckets.size());
b = buckets.get(idx); b = buckets.get(idx);
cleanObjectHistoriesInBucket(b); cleanObjectHistoriesInBucket(b,timeoutLevelSupplier);
resolveBucketState(b);
buckets.remove(idx); buckets.remove(idx);
bucketsCleaned++;
} }
} catch (final Throwable f) { } catch (final Throwable f) {
LOG.error("Error during bucket cleanup execution. Will retry later", f); LOG.error("Error during bucket cleanup execution. Will retry later", f);
} finally { } finally {
try { try {
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
LOG.trace("Bucket cleanup execution task took " + Long.toString(endTime - startTime) + "ms to complete"); LOG.trace("Bucket cleanup execution task took " + Long.toString(endTime - startTime) + "ms to complete");
if (isTimedOut() || interrupted) {
if (!cleaningStarted) {
LOG.log(timeoutLevelSupplier.get(),
"Timed out while cleaning up bucket states after processing " +
bucketsResolved + " buckets. " +
"No object records were cleaned up.");
} else {
LOG.log(timeoutLevelSupplier.get(),
"Timed out while cleaning up object records after processing " +
bucketsCleaned + " buckets.");
}
}
} catch (final Throwable f) { } catch (final Throwable f) {
// Do nothing, but don't allow exceptions out // Do nothing, but don't allow exceptions out
} }
} }
} }
public void interrupt() { public void interrupt() {
this.interrupted = true; this.interrupted = true;
} }
skipping to change at line 107 skipping to change at line 139
this.interrupted = false; this.interrupted = false;
} }
/** /**
* Fixes the state of the bucket. If in 'deleting' state, will issue deletion to backend. And remove the bucket. If in 'creating' state that is * Fixes the state of the bucket. If in 'deleting' state, will issue deletion to backend. And remove the bucket. If in 'creating' state that is
* expired (by timestamp), will issue delete to backend and update state * expired (by timestamp), will issue delete to backend and update state
* *
* @param bucket * @param bucket
*/ */
private void resolveBucketState(Bucket bucket) { private void resolveBucketState(Bucket bucket) {
LOG.trace("Resolving bucket state for bucket uuid " + bucket.getBucketUuid() ); LOG.trace("Resolving bucket state for bucket uuid " + bucket.getBucketUuid() + ", name " + bucket.getBucketName());
if (BucketState.deleting.equals(bucket.getState()) if (BucketState.deleting.equals(bucket.getState())
|| !bucket.stateStillValid(ConfigurationCache.getConfiguration(ObjectSto rageGlobalConfiguration.class) || !bucket.stateStillValid(ConfigurationCache.getConfiguration(ObjectSto rageGlobalConfiguration.class)
.getBucket_creation_wait_interval_seconds())) { .getBucket_creation_wait_interval_seconds())) {
// Clean-up a bucket marked for deletion. This usually indicates a failed delete operation previously // Clean-up a bucket marked for deletion. This usually indicates a failed delete operation previously
LOG.trace("Deleting backend bucket for bucket uuid " + bucket.getBucketUui d() + " during bucket cleanup"); LOG.trace("Deleting backend bucket for bucket uuid " + bucket.getBucketUui d() + " during bucket cleanup");
try { try {
OsgBucketFactory.getFactory().deleteBucket(ObjectStorageProviders.getIns tance(), bucket, null, null); OsgBucketFactory.getFactory().deleteBucket(ObjectStorageProviders.getIns tance(), bucket, null, null);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error cleaning deletion marked bucketuuid " + bucket.getBucke tUuid(), e); LOG.error("Error cleaning deletion marked bucketuuid " + bucket.getBucke tUuid(), e);
} }
} }
} }
private boolean isTimedOut() { private boolean isTimedOut() {
return System.currentTimeMillis() - startTime >= MAX_TASK_DURATION; return System.currentTimeMillis() >= timeoutTime;
} }
protected void cleanObjectHistoriesInBucket(Bucket b) { protected void cleanObjectHistoriesInBucket( final Bucket b, final Supplier<Le vel> timeoutLevelSupplier ) {
String nextKey = null; String nextKey = null;
final int chunkSize = 1000; final int chunkSize = 1000;
int objectsProcessed = 0;
PaginatedResult<ObjectEntity> result = null; PaginatedResult<ObjectEntity> result = null;
LOG.trace("Cleaning object histories for bucket uuid " + b.getBucketUuid()); LOG.trace("Cleaning object histories for bucket uuid " + b.getBucketUuid() + ", name " + b.getBucketName());
do { do {
try { try {
result = ObjectMetadataManagers.getInstance().listPaginated(b, chunkSize , null, null, nextKey); result = ObjectMetadataManagers.getInstance().listPaginated(b, chunkSize , null, null, nextKey);
} catch (final Throwable f) { } catch (final Throwable f) {
LOG.error("Could not get object listing for bucket " + b.getBucketName() + " with next marker: " + nextKey); LOG.error("Could not get object listing for bucket " + b.getBucketName() + " with next marker: " + nextKey);
nextKey = null;
result = null;
break; break;
} }
INNER: for (ObjectEntity obj : result.getEntityList()) { INNER: for (ObjectEntity obj : result.getEntityList()) {
try { try {
ObjectMetadataManagers.getInstance().cleanupInvalidObjects(b, obj.getO bjectKey()); ObjectMetadataManagers.getInstance().cleanupInvalidObjects(b, obj.getO bjectKey());
objectsProcessed++;
if ((objectsProcessed % 1000) == 0) {
LOG.trace("Processed " + objectsProcessed + " objects for bucket uui
d " + b.getBucketUuid() +
", name " + b.getBucketName());
}
} catch (final Throwable f) { } catch (final Throwable f) {
LOG.error("Error doing async repair of object " + b.getBucketName() + "/" + obj.getObjectKey() + " Continuing to next object", f); LOG.error("Error doing async repair of object " + b.getBucketName() + "/" + obj.getObjectKey() + " Continuing to next object", f);
} }
if (interrupted) { if (interrupted) {
break INNER; break INNER;
} }
} }
if (!interrupted && result.getIsTruncated()) { if (!interrupted && result.getIsTruncated()) {
nextKey = ((ObjectEntity) result.getLastEntry()).getObjectKey(); nextKey = ((ObjectEntity) result.getLastEntry()).getObjectKey();
} else { } else {
nextKey = null; nextKey = null;
} }
} while (nextKey != null && !isTimedOut()); } while (nextKey != null && !isTimedOut());
if (interrupted || isTimedOut()) {
LOG.log( timeoutLevelSupplier.get( ),
"Timed out while cleaning up object records in bucket uuid " +
b.getBucketUuid() + ", name " + b.getBucketName() +
" after processing " + objectsProcessed + " objects.");
} else {
LOG.trace("Finished cleaning " + objectsProcessed + " object histories for
bucket uuid " +
b.getBucketUuid() + ", name " + b.getBucketName());
}
}
private boolean logTimeout( ) {
final long lastToken = TIMEOUT_LOG_TOKEN.get( );
final long currentToken = System.currentTimeMillis( ) / TIMEOUT_LOG_INTERVAL
;
return lastToken!=currentToken && TIMEOUT_LOG_TOKEN.compareAndSet( lastToken
, currentToken );
} }
} }
 End of changes. 23 change blocks. 
24 lines changed or deleted 83 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)