Initial work on async request queue.

This commit is contained in:
Josh Stark 2019-06-16 10:57:49 +01:00
parent 8bca4b9709
commit 3b615adf4f
18 changed files with 554 additions and 22 deletions

View File

@ -27,7 +27,11 @@ import io.linuxserver.fleet.db.dao.DefaultUserDAO;
import io.linuxserver.fleet.db.migration.DatabaseVersion;
import io.linuxserver.fleet.delegate.*;
import io.linuxserver.fleet.dockerhub.DockerHubV2Client;
import io.linuxserver.fleet.dockerhub.queue.DockerHubSyncConsumer;
import io.linuxserver.fleet.dockerhub.queue.DockerHubSyncQueue;
import io.linuxserver.fleet.thread.TaskManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>
@ -36,6 +40,8 @@ import io.linuxserver.fleet.thread.TaskManager;
*/
public class FleetBeans {
private static final Logger LOGGER = LoggerFactory.getLogger(FleetBeans.class);
private final FleetProperties properties;
private final ImageDelegate imageDelegate;
private final RepositoryDelegate repositoryDelegate;
@ -46,6 +52,7 @@ public class FleetBeans {
private final TaskDelegate taskDelegate;
private final UserDelegate userDelegate;
private final PasswordEncoder passwordEncoder;
private final DockerHubSyncQueue dockerHubSyncQueue;
/**
* Ensures the database is kept up to date.
@ -68,6 +75,14 @@ public class FleetBeans {
userDelegate = new UserDelegate(passwordEncoder, new DefaultUserDAO(databaseConnection));
taskDelegate = new TaskDelegate(this);
authenticationDelegate = new DefaultAuthenticationDelegate(AuthenticatorFactory.getAuthenticator(this));
dockerHubSyncQueue = new DockerHubSyncQueue();
final int consumerThreadCount = properties.getQueueThreadCount();
for (int i = 0; i < consumerThreadCount; i++) {
LOGGER.info("Starting consumer thread " + i + "...");
new DockerHubSyncConsumer(imageDelegate, dockerHubSyncQueue, "SyncThread-" + i).start();
}
}
public FleetProperties getProperties() {

View File

@ -79,6 +79,10 @@ public class FleetProperties {
return new DockerHubCredentials(username, password);
}
public int getQueueThreadCount() {
return Integer.parseInt(getStringProperty("fleet.queue.threads"));
}
/**
* <p>
* Obtains the property value from three separate sources: first from the config file. If not present, it will look

View File

@ -23,6 +23,7 @@ import io.linuxserver.fleet.db.query.InsertUpdateStatus;
import io.linuxserver.fleet.db.query.LimitedResult;
import io.linuxserver.fleet.model.internal.Image;
import io.linuxserver.fleet.model.internal.ImagePullStat;
import io.linuxserver.fleet.model.internal.Repository;
import io.linuxserver.fleet.model.internal.Tag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -230,7 +231,10 @@ public class DefaultImageDAO implements ImageDAO {
Image image = new Image(
results.getInt("ImageId"),
results.getInt("RepositoryId"),
new Repository(
results.getInt("RepositoryId"),
results.getString("RepositoryName")
),
results.getString("ImageName"),
new Tag(
results.getString("LatestTagVersion"),

View File

@ -63,6 +63,10 @@ public class DockerHubDelegate {
return images;
}
public DockerImage fetchImageFromRepository(String repositoryName, String imageName) {
return convertImage(dockerHubClient.fetchImageFromRepository(repositoryName, imageName));
}
public List<DockerTag> fetchAllTagsForImage(String repositoryName, String imageName) {
return dockerHubClient.fetchAllTagsForImage(repositoryName, imageName).stream().map(this::convertTag).collect(Collectors.toList());
}
@ -80,6 +84,10 @@ public class DockerHubDelegate {
private DockerImage convertImage(DockerHubV2Image dockerHubV2Image) {
if (dockerHubV2Image == null) {
return null;
}
return new DockerImage(
dockerHubV2Image.getName(),
dockerHubV2Image.getNamespace(),
@ -92,6 +100,10 @@ public class DockerHubDelegate {
private DockerTag convertTag(DockerHubV2Tag dockerHubV2Tag) {
if (dockerHubV2Tag == null) {
return null;
}
return new DockerTag(
dockerHubV2Tag.getName(),
dockerHubV2Tag.getFullSize(),

View File

@ -0,0 +1,47 @@
/*
* Copyright (c) 2019 LinuxServer.io
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package io.linuxserver.fleet.dockerhub.queue;
import io.linuxserver.fleet.delegate.ImageDelegate;
import io.linuxserver.fleet.exception.SaveException;
import io.linuxserver.fleet.model.internal.Image;
import io.linuxserver.fleet.queue.AbstractQueueConsumer;
import io.linuxserver.fleet.queue.RequestQueue;
public class DockerHubSyncConsumer extends AbstractQueueConsumer<DockerHubSyncResponse, DockerHubSyncRequest> {
private final ImageDelegate imageDelegate;
public DockerHubSyncConsumer(ImageDelegate imageDelegate, RequestQueue<DockerHubSyncRequest> requestQueue, String name) {
super(requestQueue, name);
this.imageDelegate = imageDelegate;
}
@Override
protected void handleResponse(DockerHubSyncResponse response) {
try {
final Image image = response.getImage();
imageDelegate.saveImage(image);
} catch (SaveException e) {
getLogger().error("handleResponse unable to save image: {}", e.getMessage());
}
}
}

View File

@ -0,0 +1,23 @@
/*
* Copyright (c) 2019 LinuxServer.io
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package io.linuxserver.fleet.dockerhub.queue;
import io.linuxserver.fleet.queue.AbstractRequestQueue;
public class DockerHubSyncQueue extends AbstractRequestQueue<DockerHubSyncRequest> {
}

View File

@ -0,0 +1,54 @@
/*
* Copyright (c) 2019 LinuxServer.io
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package io.linuxserver.fleet.dockerhub.queue;
import io.linuxserver.fleet.delegate.DockerHubDelegate;
import io.linuxserver.fleet.model.internal.Image;
import io.linuxserver.fleet.queue.FleetRequest;
import java.util.Objects;
public class DockerHubSyncRequest implements FleetRequest<DockerHubSyncResponse> {
private final DockerHubDelegate dockerHubDelegate;
private final Image image;
public DockerHubSyncRequest(DockerHubDelegate dockerHubDelegate, Image image) {
this.dockerHubDelegate = dockerHubDelegate;
this.image = image;
}
@Override
public DockerHubSyncResponse execute() {
return new DockerHubSyncResponse(dockerHubDelegate, image);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DockerHubSyncRequest that = (DockerHubSyncRequest) o;
return Objects.equals(image, that.image);
}
@Override
public int hashCode() {
return Objects.hash(image);
}
}

View File

@ -0,0 +1,99 @@
/*
* Copyright (c) 2019 LinuxServer.io
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package io.linuxserver.fleet.dockerhub.queue;
import io.linuxserver.fleet.delegate.DockerHubDelegate;
import io.linuxserver.fleet.model.docker.DockerImage;
import io.linuxserver.fleet.model.docker.DockerTag;
import io.linuxserver.fleet.model.internal.Image;
import io.linuxserver.fleet.model.internal.Tag;
import io.linuxserver.fleet.queue.FleetResponse;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class DockerHubSyncResponse implements FleetResponse {
private final DockerHubDelegate dockerHubDelegate;
private final Image image;
public DockerHubSyncResponse(DockerHubDelegate dockerHubDelegate, Image image) {
this.dockerHubDelegate = dockerHubDelegate;
this.image = Image.copyOf(image);
}
@Override
public void handle() {
DockerImage dockerImage = dockerHubDelegate.fetchImageFromRepository(image.getRepository().getName(), image.getName());
String versionMask = getVersionMask(image.getRepository().getVersionMask(), image.getVersionMask());
Tag maskedVersion = getLatestTagAndCreateMaskedVersion(versionMask);
image.withPullCount(dockerImage.getPullCount());
image.updateTag(maskedVersion);
}
public Image getImage() {
return image;
}
private String getVersionMask(String repositoryMask, String imageMask) {
return imageMask == null ? repositoryMask : imageMask;
}
private Tag getLatestTagAndCreateMaskedVersion(String versionMask) {
DockerTag tag = dockerHubDelegate.fetchLatestImageTag(image.getRepository().getName(), image.getName());
if (null == tag)
return Tag.NONE;
if (isTagJustLatestAndNotAVersion(tag) || null == versionMask)
return new Tag(tag.getName(), tag.getName(), tag.getBuildDate());
return new Tag(tag.getName(), extractMaskedVersion(tag.getName(), versionMask), tag.getBuildDate());
}
private String extractMaskedVersion(String fullTag, String versionMask) {
Pattern pattern = Pattern.compile(versionMask);
Matcher matcher = pattern.matcher(fullTag);
if (matcher.matches()) {
StringBuilder tagBuilder = new StringBuilder();
for (int groupNum = 1; groupNum <= matcher.groupCount(); groupNum++)
tagBuilder.append(matcher.group(groupNum));
return tagBuilder.toString();
}
return fullTag;
}
/**
* <p>
* If the top-level tag is not versioned, a mask can't be applied.
* </p>
*/
private boolean isTagJustLatestAndNotAVersion(DockerTag tag) {
return "latest".equals(tag.getName());
}
}

View File

@ -29,34 +29,34 @@ import java.util.Objects;
*/
public class Image extends PersistableItem<Image> {
private final int repositoryId;
private final String name;
private final Repository repository;
private final String name;
private Tag tag;
private long pullCount;
private String versionMask;
private boolean unstable;
private boolean hidden;
private Tag tag;
private long pullCount;
private String versionMask;
private boolean unstable;
private boolean hidden;
private boolean deprecated;
private String deprecationReason;
private boolean deprecated;
private String deprecationReason;
public Image(Integer id, int repositoryId, String name, Tag latestVersion) {
public Image(Integer id, Repository repository, String name, Tag latestVersion) {
super(id);
this.name = name;
this.repositoryId = repositoryId;
this.repository = repository;
this.tag = new Tag(latestVersion.getVersion(), latestVersion.getMaskedVersion(), latestVersion.getBuildDate());
}
public Image(int repositoryId, String name) {
this(null, repositoryId, name, Tag.NONE);
public Image(Repository repository, String name) {
this(null, repository, name, Tag.NONE);
}
public static Image copyOf(Image image) {
Image cloned = new Image(image.getId(), image.repositoryId, image.name, image.tag);
Image cloned = new Image(image.getId(), Repository.copyOf(image.repository), image.name, image.tag);
cloned.pullCount = image.pullCount;
cloned.versionMask = image.versionMask;
cloned.unstable = image.unstable;
@ -108,7 +108,11 @@ public class Image extends PersistableItem<Image> {
}
public int getRepositoryId() {
return repositoryId;
return repository.getId();
}
public Repository getRepository() {
return repository;
}
public String getName() {
@ -166,11 +170,11 @@ public class Image extends PersistableItem<Image> {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Image image = (Image) o;
return repositoryId == image.repositoryId && Objects.equals(name, image.name);
return Objects.equals(repository, image.repository) && Objects.equals(name, image.name);
}
@Override
public int hashCode() {
return Objects.hash(repositoryId, name);
return Objects.hash(repository, name);
}
}

View File

@ -17,6 +17,8 @@
package io.linuxserver.fleet.model.internal;
import java.util.Objects;
public class Repository extends PersistableItem<Repository> {
private final String name;
@ -36,6 +38,10 @@ public class Repository extends PersistableItem<Repository> {
this.name = name;
}
public static Repository copyOf(Repository repository) {
return new Repository(repository.getId(), repository.name).withVersionMask(repository.versionMask).withSyncEnabled(repository.syncEnabled);
}
public Repository withVersionMask(String versionMask) {
this.versionMask = versionMask;
@ -59,4 +65,17 @@ public class Repository extends PersistableItem<Repository> {
public boolean isSyncEnabled() {
return syncEnabled;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Repository that = (Repository) o;
return Objects.equals(name, that.name);
}
@Override
public int hashCode() {
return Objects.hash(name);
}
}

View File

@ -0,0 +1,69 @@
/*
* Copyright (c) 2019 LinuxServer.io
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package io.linuxserver.fleet.queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractQueueConsumer<R extends FleetResponse, T extends FleetRequest<R>> extends Thread implements QueueConsumer<T> {
private final Logger LOGGER = LoggerFactory.getLogger(getClass().getName());
private final RequestQueue<T> requestQueue;
public AbstractQueueConsumer(final RequestQueue<T> requestQueue, final String name) {
super(name + "-Consumer");
this.requestQueue = requestQueue;
}
@Override
public void run() {
while (true) {
consume();
}
}
@Override
public void start() {
getLogger().info("Starting...");
super.start();
}
@Override
public void consume() {
try {
T request = requestQueue.takeOneRequest();
R response = request.execute();
handleResponse(response);
} catch (Exception e) {
getLogger().error("consume caught unhandled exception", e);
}
}
protected abstract void handleResponse(R response);
public Logger getLogger() {
return LOGGER;
}
}

View File

@ -0,0 +1,64 @@
/*
* Copyright (c) 2019 LinuxServer.io
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package io.linuxserver.fleet.queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
public class AbstractRequestQueue<T extends FleetRequest<? extends FleetResponse>> implements RequestQueue<T> {
private final Logger LOGGER = LoggerFactory.getLogger(getClass().getName());
private final BlockingQueue<T> requestQueue;
public AbstractRequestQueue() {
this.requestQueue = new LinkedBlockingDeque<>();
}
@Override
public synchronized void enqueueRequest(final T request) {
try {
if (!requestQueue.contains(request)) {
final boolean added = requestQueue.add(request);
if (!added) {
LOGGER.warn("enqueueRequest unable to add request to queue");
}
}
} catch (Exception e) {
LOGGER.error("enqueueRequest caught unhandled exception {}", e.getMessage());
}
}
@Override
public synchronized T takeOneRequest() {
try {
return requestQueue.take();
} catch (InterruptedException e) {
LOGGER.error("takeOneRequest was interrupted: {}", e.getMessage());
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,23 @@
/*
* Copyright (c) 2019 LinuxServer.io
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package io.linuxserver.fleet.queue;
public interface FleetRequest<T extends FleetResponse> {
T execute();
}

View File

@ -0,0 +1,23 @@
/*
* Copyright (c) 2019 LinuxServer.io
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package io.linuxserver.fleet.queue;
public interface FleetResponse {
void handle();
}

View File

@ -0,0 +1,23 @@
/*
* Copyright (c) 2019 LinuxServer.io
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package io.linuxserver.fleet.queue;
public interface QueueConsumer<T extends FleetRequest<?>> {
void consume();
}

View File

@ -0,0 +1,25 @@
/*
* Copyright (c) 2019 LinuxServer.io
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package io.linuxserver.fleet.queue;
public interface RequestQueue<T extends FleetRequest> {
void enqueueRequest(T request);
T takeOneRequest();
}

View File

@ -137,7 +137,7 @@ public class DefaultSynchronisationState implements SynchronisationState {
try {
DockerImage dockerImage = images.get(i);
Image image = configureImage(repository.getId(), dockerImage, context);
Image image = configureImage(repository, dockerImage, context);
String versionMask = getVersionMask(repository.getVersionMask(), image.getVersionMask());
Tag maskedVersion = getLatestTagAndCreateMaskedVersion(repository.getName(), image.getName(), versionMask, context);
@ -230,15 +230,15 @@ public class DefaultSynchronisationState implements SynchronisationState {
* Looks up the image in the database to see if it already exists. If it does, it gets returned, otherwise a base
* image is created with just the top-level information, as the rest will get updated later.
*/
private Image configureImage(int repositoryId, DockerImage dockerHubImage, SynchronisationContext context) {
private Image configureImage(Repository repository, DockerImage dockerHubImage, SynchronisationContext context) {
Image image = context.getImageDelegate().findImageByRepositoryAndImageName(repositoryId, dockerHubImage.getName());
Image image = context.getImageDelegate().findImageByRepositoryAndImageName(repository.getId(), dockerHubImage.getName());
if (isImageNew(image)) {
try {
return context.getImageDelegate().saveImage(new Image(repositoryId, dockerHubImage.getName()));
return context.getImageDelegate().saveImage(new Image(repository, dockerHubImage.getName()));
} catch (SaveException e) {
LOGGER.error("Tried to save new image during sync but failed", e);

View File

@ -0,0 +1,24 @@
DELIMITER //
CREATE OR REPLACE VIEW `Image_View` AS (
SELECT
images.`id` AS `ImageId`,
images.`repository` AS `RepositoryId`,
repositories.`name` AS `RepositoryName`,
images.`name` AS `ImageName`,
images.`pulls` AS `ImagePullCount`,
images.`latest_version` AS `LatestTagVersion`,
images.`latest_version_raw` AS `LatestMaskedTagVersion`,
images.`latest_version_buildtime` AS `LatestTagBuildDate`,
images.`version_mask` AS `ImageVersionMask`,
images.`hidden` AS `ImageHidden`,
images.`unstable` AS `ImageUnstable`,
images.`deprecated` AS `ImageDeprecated`,
images.`deprecation_reason` AS `ImageDeprecationReason`,
images.`modified` AS `ModifiedTime`
FROM
Images images
JOIN Repositories repositories ON repositories.`id` = images.`repository`
);
//