"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "solr/core/src/java/org/apache/solr/filestore/DistribPackageStore.java" between
solr-8.4.0-src.tgz and solr-8.4.1-src.tgz

About: Solr is the search platform from the Apache Lucene project. Its major features include full-text search, hit highlighting, faceted search, caching, replication, and a web admin interface.

DistribPackageStore.java  (solr-8.4.0-src.tgz):DistribPackageStore.java  (solr-8.4.1-src.tgz)
skipping to change at line 33 skipping to change at line 33
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Predicate; import java.util.function.Predicate;
import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.codec.digest.DigestUtils;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.util.Utils; import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer; import org.apache.solr.core.CoreContainer;
import org.apache.solr.filestore.PackageStoreAPI.MetaData; import org.apache.solr.filestore.PackageStoreAPI.MetaData;
import org.apache.solr.util.SimplePostTool;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.server.ByteBufferInputStream; import org.apache.zookeeper.server.ByteBufferInputStream;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST; import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR; import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
public class DistribPackageStore implements PackageStore { public class DistribPackageStore implements PackageStore {
static final long MAX_PKG_SIZE = Long.parseLong(System.getProperty("max.file.s tore.size", String.valueOf(100 * 1024 * 1024))); static final long MAX_PKG_SIZE = Long.parseLong(System.getProperty("max.file.s tore.size", String.valueOf(100 * 1024 * 1024)));
/**
* This is where al the files in the package store are listed
*/
static final String ZK_PACKAGESTORE = "/packagestore";
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup ().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup ().lookupClass());
private final CoreContainer coreContainer; private final CoreContainer coreContainer;
private Map<String, FileInfo> tmpFiles = new ConcurrentHashMap<>(); private Map<String, FileInfo> tmpFiles = new ConcurrentHashMap<>();
private final Path solrhome;
public DistribPackageStore(CoreContainer coreContainer) { public DistribPackageStore(CoreContainer coreContainer) {
this.coreContainer = coreContainer; this.coreContainer = coreContainer;
solrhome = this.coreContainer.getResourceLoader().getInstancePath();
ensurePackageStoreDir(coreContainer.getResourceLoader().getInstancePath()); ensurePackageStoreDir(coreContainer.getResourceLoader().getInstancePath());
} }
private String myNode() {
return coreContainer.getZkController().getNodeName();
}
@Override @Override
public Path getRealpath(String path) { public Path getRealpath(String path) {
return _getRealPath(path, solrhome);
}
private static Path _getRealPath(String path, Path solrHome) {
if (File.separatorChar == '\\') { if (File.separatorChar == '\\') {
path = path.replace('/', File.separatorChar); path = path.replace('/', File.separatorChar);
} }
if (!path.isEmpty() && path.charAt(0) != File.separatorChar) { if (!path.isEmpty() && path.charAt(0) != File.separatorChar) {
path = File.separator + path; path = File.separator + path;
} }
return new File(this.coreContainer.getResourceLoader().getInstancePath() + return new File(solrHome +
File.separator + PackageStoreAPI.PACKAGESTORE_DIRECTORY + path).toPath() ; File.separator + PackageStoreAPI.PACKAGESTORE_DIRECTORY + path).toPath() ;
} }
class FileInfo { class FileInfo {
final String path; final String path;
String metaPath; String metaPath;
ByteBuffer fileData, metaData; ByteBuffer fileData, metaData;
FileInfo(String path) { FileInfo(String path) {
this.path = path; this.path = path;
} }
ByteBuffer getFileData(boolean validate) throws IOException {
if (fileData == null) {
try (FileInputStream fis = new FileInputStream(getRealpath(path).toFile(
))) {
fileData = SimplePostTool.inputStreamToByteArray(fis);
}
}
return fileData;
}
public String getMetaPath() { public String getMetaPath() {
if (metaPath == null) { if (metaPath == null) {
int idx = path.lastIndexOf('/'); metaPath = _getMetapath(path);
metaPath = path.substring(0, idx + 1) + "." + path.substring(idx + 1) +
".json";
} }
return metaPath; return metaPath;
} }
private void persistToFile(ByteBuffer data, ByteBuffer meta) throws IOExcept ion { private void persistToFile(ByteBuffer data, ByteBuffer meta) throws IOExcept ion {
synchronized (DistribPackageStore.this) { synchronized (DistribPackageStore.this) {
this.metaData = meta; this.metaData = meta;
this.fileData = data; this.fileData = data;
Path realpath = getRealpath(path); _persistToFile(solrhome, path, data, meta);
File file = realpath.toFile();
File parent = file.getParentFile();
if (!parent.exists()) {
parent.mkdirs();
}
Map m = (Map) Utils.fromJSON(meta.array(), meta.arrayOffset(), meta.limi
t());
if (m == null || m.isEmpty()) {
throw new SolrException(SERVER_ERROR, "invalid metadata , discarding :
" + path);
}
File metdataFile = getRealpath(getMetaPath()).toFile();
try (FileOutputStream fos = new FileOutputStream(metdataFile)) {
fos.write(meta.array(), 0, meta.limit());
}
IOUtils.fsync(metdataFile.toPath(), false);
try (FileOutputStream fos = new FileOutputStream(file)) {
fos.write(data.array(), 0, data.limit());
}
log.info("persisted a file {} and metadata. sizes {} {}", path, data.lim it(), meta.limit()); log.info("persisted a file {} and metadata. sizes {} {}", path, data.lim it(), meta.limit());
IOUtils.fsync(file.toPath(), false);
} }
} }
public boolean exists(boolean validateContent, boolean fetchMissing) throws IOException { public boolean exists(boolean validateContent, boolean fetchMissing) throws IOException {
File file = getRealpath(path).toFile(); File file = getRealpath(path).toFile();
if (!file.exists()) { if (!file.exists()) {
if (fetchMissing) { if (fetchMissing) {
return fetchFromAnyNode(); return fetchFromAnyNode();
} else { } else {
return false; return false;
skipping to change at line 319 skipping to change at line 319
} }
} }
@Override @Override
public void put(FileEntry entry) throws IOException { public void put(FileEntry entry) throws IOException {
FileInfo info = new FileInfo(entry.path); FileInfo info = new FileInfo(entry.path);
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
Utils.writeJson(entry.getMetaData(), baos, true); Utils.writeJson(entry.getMetaData(), baos, true);
byte[] bytes = baos.toByteArray(); byte[] bytes = baos.toByteArray();
info.persistToFile(entry.buf, ByteBuffer.wrap(bytes, 0, bytes.length)); info.persistToFile(entry.buf, ByteBuffer.wrap(bytes, 0, bytes.length));
tmpFiles.put(entry.getPath(), info); distribute(info);
}
private void distribute(FileInfo info) {
try {
String dirName = info.path.substring(0, info.path.lastIndexOf('/'));
coreContainer.getZkController().getZkClient().makePath(ZK_PACKAGESTORE + d
irName, false, true);
coreContainer.getZkController().getZkClient().create(ZK_PACKAGESTORE + inf
o.path, info.getDetails().getMetaData().sha512.getBytes(UTF_8),
CreateMode.PERSISTENT, true);
} catch (Exception e) {
throw new SolrException(SERVER_ERROR, "Unable to create an entry in ZK", e
);
}
tmpFiles.put(info.path, info);
List<String> nodes = coreContainer.getPackageStoreAPI().shuffledNodes(); List<String> nodes = coreContainer.getPackageStoreAPI().shuffledNodes();
int i = 0; int i = 0;
int FETCHFROM_SRC = 50; int FETCHFROM_SRC = 50;
String myNodeName = coreContainer.getZkController().getNodeName(); String myNodeName = coreContainer.getZkController().getNodeName();
try { try {
for (String node : nodes) { for (String node : nodes) {
String baseUrl = coreContainer.getZkController().getZkStateReader().getB aseUrlForNodeName(node); String baseUrl = coreContainer.getZkController().getZkStateReader().getB aseUrlForNodeName(node);
String url = baseUrl.replace("/solr", "/api") + "/node/files" + entry.ge tPath() + "?getFrom="; String url = baseUrl.replace("/solr", "/api") + "/node/files" + info.pat h + "?getFrom=";
if (i < FETCHFROM_SRC) { if (i < FETCHFROM_SRC) {
// this is to protect very large clusters from overwhelming a single n ode // this is to protect very large clusters from overwhelming a single n ode
// the first FETCHFROM_SRC nodes will be asked to fetch from this node . // the first FETCHFROM_SRC nodes will be asked to fetch from this node .
// it's there in the memory now. So , it must be served fast // it's there in the memory now. So , it must be served fast
url += myNodeName; url += myNodeName;
} else { } else {
if (i == FETCHFROM_SRC) { if (i == FETCHFROM_SRC) {
// This is just an optimization // This is just an optimization
// at this point a bunch of nodes are already downloading from me // at this point a bunch of nodes are already downloading from me
// I'll wait for them to finish before asking other nodes to downloa d from each other // I'll wait for them to finish before asking other nodes to downloa d from each other
skipping to change at line 364 skipping to change at line 377
//ignore the exception //ignore the exception
// some nodes may be down or not responding // some nodes may be down or not responding
} }
i++; i++;
} }
} finally { } finally {
coreContainer.getUpdateShardHandler().getUpdateExecutor().submit(() -> { coreContainer.getUpdateShardHandler().getUpdateExecutor().submit(() -> {
try { try {
Thread.sleep(10 * 1000); Thread.sleep(10 * 1000);
} finally { } finally {
tmpFiles.remove(entry.getPath()); tmpFiles.remove(info.path);
} }
return null; return null;
}); });
} }
} }
@Override @Override
public boolean fetch(String path, String from) { public boolean fetch(String path, String from) {
if (path == null || path.isEmpty()) return false; if (path == null || path.isEmpty()) return false;
FileInfo f = new FileInfo(path); FileInfo f = new FileInfo(path);
try { try {
if (f.exists(true, false)) { if (f.exists(true, false)) {
return true; return true;
} }
skipping to change at line 424 skipping to change at line 436
} }
}); });
} }
return; return;
} }
new FileInfo(path).readData(consumer); new FileInfo(path).readData(consumer);
} }
@Override @Override
public void syncToAllNodes(String path) throws IOException {
FileInfo fi = new FileInfo(path);
if (!fi.exists(true, false)) {
throw new SolrException(BAD_REQUEST, "No such file : " + path);
}
fi.getFileData(true);
distribute(fi);
}
@Override
public List list(String path, Predicate<String> predicate) { public List list(String path, Predicate<String> predicate) {
File file = getRealpath(path).toFile(); File file = getRealpath(path).toFile();
List<FileDetails> fileDetails = new ArrayList<>(); List<FileDetails> fileDetails = new ArrayList<>();
FileType type = getType(path, false); FileType type = getType(path, false);
if (type == FileType.DIRECTORY) { if (type == FileType.DIRECTORY) {
file.list((dir, name) -> { file.list((dir, name) -> {
if (predicate == null || predicate.test(name)) { if (predicate == null || predicate.test(name)) {
if (!isMetaDataFile(name)) { if (!isMetaDataFile(name)) {
fileDetails.add(new FileInfo(path + "/" + name).getDetails()); fileDetails.add(new FileInfo(path + "/" + name).getDetails());
} }
skipping to change at line 446 skipping to change at line 468
}); });
} else if (type == FileType.FILE) { } else if (type == FileType.FILE) {
fileDetails.add(new FileInfo(path).getDetails()); fileDetails.add(new FileInfo(path).getDetails());
} }
return fileDetails; return fileDetails;
} }
@Override @Override
public void refresh(String path) {
try {
List l = null;
try {
l = coreContainer.getZkController().getZkClient().getChildren(ZK_PACKAGE
STORE+ path, null, true);
} catch (KeeperException.NoNodeException e) {
// does not matter
}
if (l != null && !l.isEmpty()) {
List myFiles = list(path, s -> true);
for (Object f : l) {
if (!myFiles.contains(f)) {
log.info("{} does not exist locally, downloading.. ",f);
fetch(path + "/" + f.toString(), "*");
}
}
}
} catch (Exception e) {
log.error("Could not refresh files in " +path, e);
}
}
@Override
public FileType getType(String path, boolean fetchMissing) { public FileType getType(String path, boolean fetchMissing) {
File file = getRealpath(path).toFile(); File file = getRealpath(path).toFile();
if (!file.exists() && fetchMissing) { if (!file.exists() && fetchMissing) {
if (fetch(path, null)) { if (fetch(path, null)) {
file = getRealpath(path).toFile(); file = getRealpath(path).toFile();
} }
} }
return _getFileType(file);
}
public static FileType _getFileType(File file) {
if (!file.exists()) return FileType.NOFILE; if (!file.exists()) return FileType.NOFILE;
if (file.isDirectory()) return FileType.DIRECTORY; if (file.isDirectory()) return FileType.DIRECTORY;
return isMetaDataFile(file.getName()) ? FileType.METADATA : FileType.FILE; return isMetaDataFile(file.getName()) ? FileType.METADATA : FileType.FILE;
} }
private boolean isMetaDataFile(String file) { public static boolean isMetaDataFile(String file) {
return file.charAt(0) == '.' && file.endsWith(".json"); return file.charAt(0) == '.' && file.endsWith(".json");
} }
private void ensurePackageStoreDir(Path solrHome) { private void ensurePackageStoreDir(Path solrHome) {
final File packageStoreDir = getPackageStoreDirPath(solrHome).toFile(); final File packageStoreDir = getPackageStoreDirPath(solrHome).toFile();
if (!packageStoreDir.exists()) { if (!packageStoreDir.exists()) {
try { try {
final boolean created = packageStoreDir.mkdirs(); final boolean created = packageStoreDir.mkdirs();
if (!created) { if (!created) {
log.warn("Unable to create [{}] directory in SOLR_HOME [{}]. Features requiring this directory may fail.", packageStoreDir, solrHome); log.warn("Unable to create [{}] directory in SOLR_HOME [{}]. Features requiring this directory may fail.", packageStoreDir, solrHome);
} }
} catch (Exception e) { } catch (Exception e) {
log.warn("Unable to create [" + packageStoreDir + "] directory in SOLR_H OME [" + solrHome + "]. Features requiring this directory may fail.", e); log.warn("Unable to create [" + packageStoreDir + "] directory in SOLR_H OME [" + solrHome + "]. Features requiring this directory may fail.", e);
} }
} }
} }
public static Path getPackageStoreDirPath(Path solrHome) { public static Path getPackageStoreDirPath(Path solrHome) {
return Paths.get(solrHome.toAbsolutePath().toString(), PackageStoreAPI.PACKA GESTORE_DIRECTORY).toAbsolutePath(); return Paths.get(solrHome.toAbsolutePath().toString(), PackageStoreAPI.PACKA GESTORE_DIRECTORY).toAbsolutePath();
} }
private static String _getMetapath(String path) {
int idx = path.lastIndexOf('/');
return path.substring(0, idx + 1) + "." + path.substring(idx + 1) + ".json";
}
/**
* Internal API
*/
public static void _persistToFile(Path solrHome, String path, ByteBuffer data,
ByteBuffer meta) throws IOException {
Path realpath = _getRealPath(path, solrHome);
File file = realpath.toFile();
File parent = file.getParentFile();
if (!parent.exists()) {
parent.mkdirs();
}
Map m = (Map) Utils.fromJSON(meta.array(), meta.arrayOffset(), meta.limit())
;
if (m == null || m.isEmpty()) {
throw new SolrException(SERVER_ERROR, "invalid metadata , discarding : " +
path);
}
File metdataFile = _getRealPath(_getMetapath(path), solrHome).toFile();
try (FileOutputStream fos = new FileOutputStream(metdataFile)) {
fos.write(meta.array(), 0, meta.limit());
}
IOUtils.fsync(metdataFile.toPath(), false);
try (FileOutputStream fos = new FileOutputStream(file)) {
fos.write(data.array(), 0, data.limit());
}
IOUtils.fsync(file.toPath(), false);
}
@Override
public Map<String, byte[]> getKeys() throws IOException {
return _getKeys(solrhome);
}
// reads local keys file
private static Map<String, byte[]> _getKeys(Path solrhome) throws IOException
{
Map<String, byte[]> result = new HashMap<>();
Path keysDir = _getRealPath(PackageStoreAPI.KEYS_DIR, solrhome);
File[] keyFiles = keysDir.toFile().listFiles();
if (keyFiles == null) return result;
for (File keyFile : keyFiles) {
if (keyFile.isFile() && !isMetaDataFile(keyFile.getName())) {
try (InputStream fis = new FileInputStream(keyFile)) {
result.put(keyFile.getName(), SimplePostTool.inputStreamToByteArray(fi
s).array());
}
}
}
return result;
}
} }
 End of changes. 22 change blocks. 
37 lines changed or deleted 148 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)