diff --git a/src/main/java/org/gearman/GearmanClient.java b/src/main/java/org/gearman/GearmanClient.java
index 464c397..a1b5c81 100644
--- a/src/main/java/org/gearman/GearmanClient.java
+++ b/src/main/java/org/gearman/GearmanClient.java
@@ -157,6 +157,23 @@ public interface GearmanClient extends GearmanService {
*/
public GearmanJobReturn submitBackgroundJob(String functionName, byte[] data, GearmanJobPriority priority);
+ /**
+ * Submits a background job to a registered job server
+ * @param functionName
+ * gearman function name
+ * @param data
+ * gearman job data
+ * @param priority
+ * gearman job priority
+ * @param uniqueid
+ * gearman unique job id (not queue-in the same job multiple times)
+ * @return
+ * The job return used to poll submit operation status
+ * @throws NullPointerException
+ * If the function name is null
+ */
+ public GearmanJobReturn submitBackgroundJob(String functionName, byte[] data, GearmanJobPriority priority, String uniqueid);
+
/**
* Submits a background job to a registered job server
* @param functionName
@@ -245,4 +262,5 @@ public interface GearmanClient extends GearmanService {
* The policy for handling unexpected disconnects
*/
public void setLostConnectionPolicy(GearmanLostConnectionPolicy policy);
+
}
\ No newline at end of file
diff --git a/src/main/java/org/gearman/impl/client/ClientImpl.java b/src/main/java/org/gearman/impl/client/ClientImpl.java
index a3140ad..f2ebf27 100644
--- a/src/main/java/org/gearman/impl/client/ClientImpl.java
+++ b/src/main/java/org/gearman/impl/client/ClientImpl.java
@@ -30,6 +30,7 @@
import java.util.Deque;
import java.util.List;
import java.util.Queue;
+import java.util.UUID;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -50,7 +51,6 @@
import org.gearman.impl.serverpool.ControllerState;
import org.gearman.impl.serverpool.GearmanJobStatusImpl;
import org.gearman.impl.util.ByteArray;
-import org.gearman.impl.util.GearmanUtils;
import org.gearman.impl.util.TaskJoin;
public class ClientImpl extends AbstractJobServerPool implements GearmanClient {
@@ -459,17 +459,26 @@ public GearmanJobReturn submitBackgroundJob(String functionName, byte[] data, Ge
return submitJob(functionName, data, priority, true);
}
+ @Override
+ public GearmanJobReturn submitBackgroundJob(String functionName, byte[] data, GearmanJobPriority priority, String uniqueId) {
+ return submitJob(functionName, data, priority, true, uniqueId);
+ }
+
private GearmanJobReturn submitJob(String functionName, byte[] data, GearmanJobPriority priority, boolean isBackground) {
+ return submitJob(functionName, data, priority, isBackground, null);
+ }
+
+ private GearmanJobReturn submitJob(String functionName, byte[] data, GearmanJobPriority priority, boolean isBackground, String uniqueId) {
final GearmanJobReturnImpl jobReturn = new GearmanJobReturnImpl();
- submitJob(jobReturn, functionName, data, priority, isBackground);
+ submitJob(jobReturn, functionName, data, priority, isBackground, uniqueId);
return jobReturn;
}
- private void submitJob(BackendJobReturn jobReturn, String functionName, byte[] data, GearmanJobPriority priority, boolean isBackground) {
+ private void submitJob(BackendJobReturn jobReturn, String functionName, byte[] data, GearmanJobPriority priority, boolean isBackground, String uniqueId) {
if(functionName==null) throw new NullPointerException();
if(data==null) data = new byte[0];
if(priority==null) priority = GearmanJobPriority.NORMAL_PRIORITY;
-
+ if(uniqueId==null) uniqueId = UUID.randomUUID().toString();
if(this.isShutdown()) {
jobReturn.eof(GearmanJobEventImmutable.GEARMAN_SUBMIT_FAIL_SERVICE_SHUTDOWN);
return;
@@ -478,7 +487,7 @@ private void submitJob(BackendJobReturn jobReturn, String functionName, byte[] d
return;
}
- this.addJob(new ClientJobSubmission(functionName, data, GearmanUtils.createUID() , jobReturn, priority, isBackground));
+ this.addJob(new ClientJobSubmission(functionName, data, uniqueId.getBytes() , jobReturn, priority, isBackground));
}
@Override
@@ -505,7 +514,7 @@ private GearmanJoin submitJob(String functionName, byte[] data, GearmanJo
if(callback==null) throw new NullPointerException();
final GearmanJobEventCallbackCaller jobReturn = new GearmanJobEventCallbackCaller(attachment, callback, this.getGearman().getScheduler());
- submitJob(jobReturn, functionName, data, priority, isBackground);
+ submitJob(jobReturn, functionName, data, priority, isBackground, null);
return jobReturn;
}
}
diff --git a/src/test/java/org/gearman/test/uniquejob/UniqueTetsts.java b/src/test/java/org/gearman/test/uniquejob/UniqueTetsts.java
new file mode 100644
index 0000000..12ab0e5
--- /dev/null
+++ b/src/test/java/org/gearman/test/uniquejob/UniqueTetsts.java
@@ -0,0 +1,115 @@
+package org.gearman.test.uniquejob;
+
+import static org.junit.Assert.*;
+import org.gearman.Gearman;
+import org.gearman.GearmanClient;
+import org.gearman.GearmanFunction;
+import org.gearman.GearmanFunctionCallback;
+import org.gearman.GearmanJobEvent;
+import org.gearman.GearmanJobPriority;
+import org.gearman.GearmanJobReturn;
+import org.gearman.GearmanServer;
+import org.gearman.GearmanWorker;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class UniqueTetsts {
+
+ Gearman gearman;
+ GearmanClient client;
+ GearmanServer server;
+ GearmanJobReturn jr;
+ GearmanJobEvent event;
+ GearmanWorker worker;
+
+ private final String IP_SERVER = "127.0.0.1";
+ private final Integer PORT_GEARMAN = 4730;
+
+ private final String UNIQUE = "unique";
+ private final String NOTUNIQUE = "notUnique";
+
+ private Integer counter = 0;
+ private Integer functionsReachedUnique = 0;
+ private Integer functionsReachedNotUnique = 0;
+
+ @Before
+ public void setup(){
+ this.counter = 0;
+ this.functionsReachedNotUnique = 0;
+ this.functionsReachedUnique = 0;
+ gearman = Gearman.createGearman();
+ client = gearman.createGearmanClient();
+ server = gearman.createGearmanServer(IP_SERVER, PORT_GEARMAN);
+ client.addServer(server);
+ worker = gearman.createGearmanWorker();
+ worker.addServer(server);
+ worker.removeAllFunctions();
+ }
+
+ @Test
+ public void testJobNotUnique() {
+ client.submitBackgroundJob(getQueueInName(NOTUNIQUE), getSendByte("NOTUnique"), getPriority());
+ client.submitBackgroundJob(getQueueInName(NOTUNIQUE), getSendByte("NOTUnique"), getPriority());
+ client.submitBackgroundJob(getQueueInName(NOTUNIQUE), getSendByte("NOTUnique"), getPriority());
+
+ worker.addFunction(getQueueInName(NOTUNIQUE), new GearmanFunction() {
+ @Override
+ public byte[] work(String function, byte[] data,
+ GearmanFunctionCallback callback) throws Exception {
+ functionsReachedNotUnique++;
+ return null;
+ }
+
+ });
+
+ //Sleep because of server communication latency
+ sleep();
+ assertEquals((Integer)3, (Integer)this.functionsReachedNotUnique);
+ }
+
+ @Test
+ public void testJobUnique() {
+ client.submitBackgroundJob(getQueueInName(UNIQUE), getSendByte("Unique"), getPriority(), "uniqueid");
+ client.submitBackgroundJob(getQueueInName(UNIQUE), getSendByte("Unique"), getPriority(), "uniqueid");
+ client.submitBackgroundJob(getQueueInName(UNIQUE), getSendByte("Unique"), getPriority(), "uniqueid");
+
+ worker.addFunction(getQueueInName(UNIQUE), new GearmanFunction() {
+ @Override
+ public byte[] work(String function, byte[] data,
+ GearmanFunctionCallback callback) throws Exception {
+ functionsReachedUnique++;
+ System.out.println(functionsReachedUnique);
+ return null;
+ }
+ });
+
+ //Sleep because of server communication latency
+ sleep();
+ assertEquals((Integer)1, (Integer)this.functionsReachedUnique);
+ }
+
+ private String getQueueInName(String function){
+ return "test_queu_" + function;
+ }
+
+ private byte[] getSendByte(String fromTest){
+ String result = "fromTest: " + fromTest;
+ result += counter++;
+ result += " ##";
+ return result.getBytes();
+ }
+
+ private GearmanJobPriority getPriority(){
+ return GearmanJobPriority.NORMAL_PRIORITY;
+ }
+
+ private void sleep(){
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+}