diff --git a/src/main/java/org/gearman/GearmanClient.java b/src/main/java/org/gearman/GearmanClient.java index 464c397..a1b5c81 100644 --- a/src/main/java/org/gearman/GearmanClient.java +++ b/src/main/java/org/gearman/GearmanClient.java @@ -157,6 +157,23 @@ public interface GearmanClient extends GearmanService { */ public GearmanJobReturn submitBackgroundJob(String functionName, byte[] data, GearmanJobPriority priority); + /** + * Submits a background job to a registered job server + * @param functionName + * gearman function name + * @param data + * gearman job data + * @param priority + * gearman job priority + * @param uniqueid + * gearman unique job id (not queue-in the same job multiple times) + * @return + * The job return used to poll submit operation status + * @throws NullPointerException + * If the function name is null + */ + public GearmanJobReturn submitBackgroundJob(String functionName, byte[] data, GearmanJobPriority priority, String uniqueid); + /** * Submits a background job to a registered job server * @param functionName @@ -245,4 +262,5 @@ public interface GearmanClient extends GearmanService { * The policy for handling unexpected disconnects */ public void setLostConnectionPolicy(GearmanLostConnectionPolicy policy); + } \ No newline at end of file diff --git a/src/main/java/org/gearman/impl/client/ClientImpl.java b/src/main/java/org/gearman/impl/client/ClientImpl.java index a3140ad..f2ebf27 100644 --- a/src/main/java/org/gearman/impl/client/ClientImpl.java +++ b/src/main/java/org/gearman/impl/client/ClientImpl.java @@ -30,6 +30,7 @@ import java.util.Deque; import java.util.List; import java.util.Queue; +import java.util.UUID; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -50,7 +51,6 @@ import org.gearman.impl.serverpool.ControllerState; import org.gearman.impl.serverpool.GearmanJobStatusImpl; import org.gearman.impl.util.ByteArray; -import org.gearman.impl.util.GearmanUtils; import org.gearman.impl.util.TaskJoin; public class ClientImpl extends AbstractJobServerPool implements GearmanClient { @@ -459,17 +459,26 @@ public GearmanJobReturn submitBackgroundJob(String functionName, byte[] data, Ge return submitJob(functionName, data, priority, true); } + @Override + public GearmanJobReturn submitBackgroundJob(String functionName, byte[] data, GearmanJobPriority priority, String uniqueId) { + return submitJob(functionName, data, priority, true, uniqueId); + } + private GearmanJobReturn submitJob(String functionName, byte[] data, GearmanJobPriority priority, boolean isBackground) { + return submitJob(functionName, data, priority, isBackground, null); + } + + private GearmanJobReturn submitJob(String functionName, byte[] data, GearmanJobPriority priority, boolean isBackground, String uniqueId) { final GearmanJobReturnImpl jobReturn = new GearmanJobReturnImpl(); - submitJob(jobReturn, functionName, data, priority, isBackground); + submitJob(jobReturn, functionName, data, priority, isBackground, uniqueId); return jobReturn; } - private void submitJob(BackendJobReturn jobReturn, String functionName, byte[] data, GearmanJobPriority priority, boolean isBackground) { + private void submitJob(BackendJobReturn jobReturn, String functionName, byte[] data, GearmanJobPriority priority, boolean isBackground, String uniqueId) { if(functionName==null) throw new NullPointerException(); if(data==null) data = new byte[0]; if(priority==null) priority = GearmanJobPriority.NORMAL_PRIORITY; - + if(uniqueId==null) uniqueId = UUID.randomUUID().toString(); if(this.isShutdown()) { jobReturn.eof(GearmanJobEventImmutable.GEARMAN_SUBMIT_FAIL_SERVICE_SHUTDOWN); return; @@ -478,7 +487,7 @@ private void submitJob(BackendJobReturn jobReturn, String functionName, byte[] d return; } - this.addJob(new ClientJobSubmission(functionName, data, GearmanUtils.createUID() , jobReturn, priority, isBackground)); + this.addJob(new ClientJobSubmission(functionName, data, uniqueId.getBytes() , jobReturn, priority, isBackground)); } @Override @@ -505,7 +514,7 @@ private GearmanJoin submitJob(String functionName, byte[] data, GearmanJo if(callback==null) throw new NullPointerException(); final GearmanJobEventCallbackCaller jobReturn = new GearmanJobEventCallbackCaller(attachment, callback, this.getGearman().getScheduler()); - submitJob(jobReturn, functionName, data, priority, isBackground); + submitJob(jobReturn, functionName, data, priority, isBackground, null); return jobReturn; } } diff --git a/src/test/java/org/gearman/test/uniquejob/UniqueTetsts.java b/src/test/java/org/gearman/test/uniquejob/UniqueTetsts.java new file mode 100644 index 0000000..12ab0e5 --- /dev/null +++ b/src/test/java/org/gearman/test/uniquejob/UniqueTetsts.java @@ -0,0 +1,115 @@ +package org.gearman.test.uniquejob; + +import static org.junit.Assert.*; +import org.gearman.Gearman; +import org.gearman.GearmanClient; +import org.gearman.GearmanFunction; +import org.gearman.GearmanFunctionCallback; +import org.gearman.GearmanJobEvent; +import org.gearman.GearmanJobPriority; +import org.gearman.GearmanJobReturn; +import org.gearman.GearmanServer; +import org.gearman.GearmanWorker; +import org.junit.Before; +import org.junit.Test; + + +public class UniqueTetsts { + + Gearman gearman; + GearmanClient client; + GearmanServer server; + GearmanJobReturn jr; + GearmanJobEvent event; + GearmanWorker worker; + + private final String IP_SERVER = "127.0.0.1"; + private final Integer PORT_GEARMAN = 4730; + + private final String UNIQUE = "unique"; + private final String NOTUNIQUE = "notUnique"; + + private Integer counter = 0; + private Integer functionsReachedUnique = 0; + private Integer functionsReachedNotUnique = 0; + + @Before + public void setup(){ + this.counter = 0; + this.functionsReachedNotUnique = 0; + this.functionsReachedUnique = 0; + gearman = Gearman.createGearman(); + client = gearman.createGearmanClient(); + server = gearman.createGearmanServer(IP_SERVER, PORT_GEARMAN); + client.addServer(server); + worker = gearman.createGearmanWorker(); + worker.addServer(server); + worker.removeAllFunctions(); + } + + @Test + public void testJobNotUnique() { + client.submitBackgroundJob(getQueueInName(NOTUNIQUE), getSendByte("NOTUnique"), getPriority()); + client.submitBackgroundJob(getQueueInName(NOTUNIQUE), getSendByte("NOTUnique"), getPriority()); + client.submitBackgroundJob(getQueueInName(NOTUNIQUE), getSendByte("NOTUnique"), getPriority()); + + worker.addFunction(getQueueInName(NOTUNIQUE), new GearmanFunction() { + @Override + public byte[] work(String function, byte[] data, + GearmanFunctionCallback callback) throws Exception { + functionsReachedNotUnique++; + return null; + } + + }); + + //Sleep because of server communication latency + sleep(); + assertEquals((Integer)3, (Integer)this.functionsReachedNotUnique); + } + + @Test + public void testJobUnique() { + client.submitBackgroundJob(getQueueInName(UNIQUE), getSendByte("Unique"), getPriority(), "uniqueid"); + client.submitBackgroundJob(getQueueInName(UNIQUE), getSendByte("Unique"), getPriority(), "uniqueid"); + client.submitBackgroundJob(getQueueInName(UNIQUE), getSendByte("Unique"), getPriority(), "uniqueid"); + + worker.addFunction(getQueueInName(UNIQUE), new GearmanFunction() { + @Override + public byte[] work(String function, byte[] data, + GearmanFunctionCallback callback) throws Exception { + functionsReachedUnique++; + System.out.println(functionsReachedUnique); + return null; + } + }); + + //Sleep because of server communication latency + sleep(); + assertEquals((Integer)1, (Integer)this.functionsReachedUnique); + } + + private String getQueueInName(String function){ + return "test_queu_" + function; + } + + private byte[] getSendByte(String fromTest){ + String result = "fromTest: " + fromTest; + result += counter++; + result += " ##"; + return result.getBytes(); + } + + private GearmanJobPriority getPriority(){ + return GearmanJobPriority.NORMAL_PRIORITY; + } + + private void sleep(){ + try { + Thread.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + +}