据我所知,没有选择使用gemfire中的查询来更新单个列。要更新单个列,我目前正在获取整个旧对象并修改更改的值并将其存储。如果有人实现了更新单个列的任何东西,请分享。
@Region("tracking")
public class Tracking implements Serializable {
public String id;
public String status;
public String program;
}
@Region("tracking")
public interface TrackingQueryRepository extends CrudRepository<Tracking, String> {
}
我是Delta传播实施的新手。我已经阅读了用户指南并尝试实施并收到了下面给出的例外情况。你能分享你对此的想法吗?
Another.java-域名类
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import org.springframework.data.annotation.Id;
import org.springframework.data.gemfire.mapping.Region;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.gemstone.gemfire.Delta;
import com.gemstone.gemfire.InvalidDeltaException;
@Region("delta")
public class Another implements Delta, Serializable {
private static final long serialVersionUID = 1L;
@Id
private String anotherId;
@JsonProperty("anotherProgramId")
private String anotherProgramId;
public Another() {
}
public Another(String anotherId, String anotherProgramId) {
this.anotherId = anotherId;
this.anotherProgramId = anotherProgramId;
}
public String getAnotherId() {
return anotherId;
}
public void setAnotherId(String anotherId) {
this.anotherIdChd = true;
this.anotherId = anotherId;
}
public String getAnotherProgramId() {
return anotherProgramId;
}
public void setAnotherProgramId(String anotherProgramId) {
this.anotherProgramIdChd = true;
this.anotherProgramId = anotherProgramId;
}
private transient boolean anotherIdChd = false;
private transient boolean anotherProgramIdChd = false;
@Override
public String toString() {
return "Another [anotherId=" + anotherId + ", anotherProgramId=" + anotherProgramId + "]";
}
@Override
public void fromDelta(DataInput in) throws IOException, InvalidDeltaException {
if (in.readBoolean()) {
// Read the change and apply it to the object
this.anotherId = in.toString();
System.out.println(" Applied delta to field 'anotherId' = " + this.anotherId);
}
if (in.readBoolean()) {
this.anotherProgramId = in.toString();
System.out.println(" Applied delta to field 'anotherProgramId' = " + this.anotherProgramId);
}
}
@Override
public boolean hasDelta() {
return this.anotherIdChd || this.anotherProgramIdChd;
}
@Override
public void toDelta(DataOutput out) throws IOException {
System.out.println("Extracting delta from " + this.toString());
out.writeBoolean(anotherIdChd);
if (anotherIdChd) {
// Write just the changes into the data stream
out.writeUTF(this.anotherId);
// Once the delta information is written, reset the delta status
// field
this.anotherIdChd = false;
System.out.println(" Extracted delta from field 'anotherId' = " + this.anotherId);
}
out.writeBoolean(anotherProgramIdChd);
if (anotherProgramIdChd) {
out.writeUTF(this.anotherProgramId);
this.anotherProgramIdChd = false;
System.out.println(" Extracted delta from field 'anotherProgramId' = " + this.anotherProgramId);
}
}
}
client-cache. xml
<pdx>
<pdx-serializer>
<class-name>com.gemstone.gemfire.pdx.ReflectionBasedAutoSerializer</class-name>
<parameter name="classes">
<string>com\.rs\.main\..+</string>
</parameter>
</pdx-serializer>
</pdx>
SpringXML命名空间
<util:properties id="gemfire-props">
<prop key="delta-propagation">true</prop>
</util:properties>
<gfe:client-cache pool-name="serverPool" cache-xml-location="classpath:client-cache.xml" properties-ref="gemfire-props"/>
<gfe:client-region id="delta" pool-name="serverPool" shortcut="PROXY" cloning-enabled="true">
本地gefire实例版本-pivotal-gemfire-9.0.1
区域创建创建区域-name=delta-type=REPLICATE
例外:
2017-05-08 22:17:12.370 ERROR 14696 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.dao.DataAccessResourceFailureException: remote server on 10.148.210.249(:loner):53784:e10627eb: com.gemstone.gemfire.pdx.PdxSerializationException: Could not create an instance of a class com.rs.main.Another; nested exception is com.gemstone.gemfire.cache.client.ServerOperationException: remote server on 10.148.210.249(:loner):53784:e10627eb: com.gemstone.gemfire.pdx.PdxSerializationException: Could not create an instance of a class com.rs.main.Another] with root cause
java.lang.ClassNotFoundException: com.rs.main.Another
at org.apache.geode.internal.ClassPathLoader.forName(ClassPathLoader.java:437) ~[na:na]
at org.apache.geode.internal.InternalDataSerializer.getCachedClass(InternalDataSerializer.java:4010) ~[na:na]
at org.apache.geode.pdx.internal.PdxType.getPdxClass(PdxType.java:235) ~[na:na]
at org.apache.geode.pdx.internal.PdxReaderImpl.basicGetObject(PdxReaderImpl.java:687) ~[na:na]
at org.apache.geode.pdx.internal.PdxReaderImpl.getObject(PdxReaderImpl.java:682) ~[na:na]
at org.apache.geode.internal.InternalDataSerializer.readPdxSerializable(InternalDataSerializer.java:3218) ~[na:na]
at org.apache.geode.internal.InternalDataSerializer.basicReadObject(InternalDataSerializer.java:3005) ~[na:na]
at org.apache.geode.DataSerializer.readObject(DataSerializer.java:2897) ~[na:na]
at org.apache.geode.internal.util.BlobHelper.deserializeBlob(BlobHelper.java:90) ~[na:na]
at org.apache.geode.internal.cache.EntryEventImpl.deserialize(EntryEventImpl.java:1891) ~[na:na]
at org.apache.geode.internal.cache.EntryEventImpl.deserialize(EntryEventImpl.java:1884) ~[na:na]
at org.apache.geode.internal.cache.VMCachedDeserializable.getDeserializedValue(VMCachedDeserializable.java:134) ~[na:na]
at org.apache.geode.internal.cache.EntryEventImpl.processDeltaBytes(EntryEventImpl.java:1687) ~[na:na]
at org.apache.geode.internal.cache.EntryEventImpl.setNewValueInRegion(EntryEventImpl.java:1558) ~[na:na]
at org.apache.geode.internal.cache.EntryEventImpl.putExistingEntry(EntryEventImpl.java:1504) ~[na:na]
at org.apache.geode.internal.cache.AbstractRegionMap.updateEntry(AbstractRegionMap.java:2959) ~[na:na]
at org.apache.geode.internal.cache.AbstractRegionMap.basicPut(AbstractRegionMap.java:2782) ~[na:na]
at org.apache.geode.internal.cache.LocalRegion.virtualPut(LocalRegion.java:5750) ~[na:na]
at org.apache.geode.internal.cache.DistributedRegion.virtualPut(DistributedRegion.java:337) ~[na:na]
at org.apache.geode.internal.cache.LocalRegionDataView.putEntry(LocalRegionDataView.java:151) ~[na:na]
at org.apache.geode.internal.cache.LocalRegion.basicUpdate(LocalRegion.java:5730) ~[na:na]
at org.apache.geode.internal.cache.LocalRegion.basicBridgePut(LocalRegion.java:5374) ~[na:na]
at org.apache.geode.internal.cache.tier.sockets.command.Put65.cmdExecute(Put65.java:381) ~[na:na]
at org.apache.geode.internal.cache.tier.sockets.BaseCommand.execute(BaseCommand.java:141) ~[na:na]
at org.apache.geode.internal.cache.tier.sockets.ServerConnection.doNormalMsg(ServerConnection.java:776) ~[na:na]
at org.apache.geode.internal.cache.tier.sockets.ServerConnection.doOneMessage(ServerConnection.java:904) ~[na:na]
at org.apache.geode.internal.cache.tier.sockets.ServerConnection.run(ServerConnection.java:1160) ~[na:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_121]
at org.apache.geode.internal.cache.tier.sockets.AcceptorImpl$1$1.run(AcceptorImpl.java:519) ~[na:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
嗨(再次)Vigneshwaran-
对,所以GemFire的Query功能(通过QueryService)严格用于运行查询(即SELECT语句)。GemFire OQL中没有用于UPDATES和DELETES的等价物。GemFire是一个Key/Value存储,具有类似于Map
的操作(例如get(key)
、put(key, value)
等),您通常在其中处理整个应用程序域对象。但是,无论您的应用程序是对等缓存(即集群的成员)还是缓存客户端,GemFire的一些功能都可以为您提供帮助。通常,应用程序是缓存客户端,并具有/使用ClientCache,其中集群是独立的,客户端连接到集群很像RDBMS。
我还要说,虽然Function服务很有用,但它不是唯一的选择,实际上在代码方面可能会有更多的开销。
正如Wes在上面提到的,使用PARTITION Region是非常典型的,尤其是对于“事务性”数据(注意:复制区域更适用于不经常更改的参考数据)。
在“Function”可以帮助您的地方,您可以编写Function以获取对应用程序域对象的更新。“update”可以在Function的“参数”中传递。要调用Function,您可以使用GemFire的FunctionService来执行,使用目标方法之一(例如[onRegion(“跟踪”)][7]
)。
注意:其他目标方法(即on会员
和onServer
)分别特定于您的应用程序是“对等体”还是“客户端”。例如,如果您的应用程序是客户端,则不能调用on会员
,因为它假定您的应用程序是“对等体”。同样,如果您的应用程序是对等体,则不能调用onServer(s)
,因为它假定您的应用程序是“客户端”。onRegion(…)
无论应用程序是对等体还是客户端都有效。虽然您可能会想为什么不一直使用onRegion
,但根据您的UC(例如,考虑服务器组和路由)使用其他形式的定位具有技术优势。
当Region是一个PARTITION时,您还可以设置Function的[OptimizeForWrite()][8]
,这意味着Function将更新Region数据,因此,当使用Wes上面描述的过滤选项指定键时,将被路由到PARTITION的键主存储桶。
PARTITION Region的一致性来自这样一个事实,即所有更新都首先被路由并写入“主”(无论哪个服务器接收客户端的更新,也可能是一个甚至不托管该Region或相关数据/密钥的服务器;即不同的分片)。在主服务器更新后,然后数据更改被传播(分布)到集群中托管分区/分片数据集的辅助节点的其他节点。这就是Wes上面提到的“事务”一致性。
注意:PARTITION只是数据分片的另一个词,其中数据均匀分布在可用节点的集群中。当添加/删除节点时,数据会重新平衡。PARTITION也可以具有冗余。这些被称为辅助。PARTITION区域有助于延迟和吞吐量,因为数据被划分(默认分为113个桶),其中每个桶都有主副本,可能有1个或多个副本(辅助,用于冗余;HA),从而提高读写吞吐量。
此外,如果数据必须保留,那么您还可以设置Function的HA属性。这将允许在失败的情况下重试。
然而,尽管有所有这些优势,您仍然必须处理服务器上Function中更新应用程序域对象的“操作方法”。您还必须处理“映射”,因为像GemFire这样的Key/Value存储中确实没有等效的ORM。当然,这并不难,但也许有更好的方法。
还有另一个功能,称为Delta传播。本质上,每当您进行更新时,您总是会获取并更新GemFire中的完整值。
注意:可以像投影一样查询对象的选择字段,但它不是代理或与实际对象相关。
当您利用GemFire的序列化功能时,您可以利用Delta传播。
当实现“增量”时,只有应用程序域对象中的差异被实际序列化,通过线路发送,无论是在客户端和服务器之间,还是在坚持冗余策略时在对等方之间。这对你来说是完全无缝的。你得到你的对象(客户端),更新它,然后把它放进去。GemFire为你处理发送“增量”的逻辑。
此外,在集群中的服务器上使用客户端/服务器拓扑和PARTITION区域时,您可以启用单跳访问,这会有效地将数据路由到包含“主”存储桶的服务器,从而避免额外的网络跃点,这将影响您对每次操作的感知延迟。
因此,在Deltas和Single-Hop之间,您最终会得到一个非常高性能的解决方案,并且仍然可以利用面向对象的方法,如您所期望的那样使用您的应用程序域对象API。
但是,请注意使用Deltas的陷阱。
不管怎样,思考的食粮。你通常有不止一种方法来完成一项任务,但是更好的方法并不总是显而易见的,直到你根据你的UC/目标来衡量和评估预期的效果。
干杯约翰
您不能使用查询服务更新列。我建议您考虑Function服务以实现事务一致性。使区域分区并使用. onRegion().withFilter(key).withArgs(列和值映射)调用函数。
您的函数将读取对象、应用更新并放置。
通过这种方式,您的读取和更新将发生在服务器上的单个线程中,确保事务一致性,而不是读取客户端上的对象,更改值,执行放置操作,并希望没有其他人在您下面溜进来。
我们实现相同目标的另一种方法是使用自定义函数,该函数在获取分布式锁后更新值(BeanUtils)。
这可能会增加性能开销,但保证数据完整性。这就是权衡。
见下面的伪代码
try{
//this can be regionName
dls = DistributedLockService.getServiceNamed(arbitrary-lock-name)
//the key is normally the object @Id
dls.lock(some-key, waitTimeOut, leaseTimeOut)
row = region.get(id)
//Here we copy the desired value (input to function) to the latest value
BeanUtils.copyProperty(row, key, value);
//Insert the modified record to Gemfire - now this becomes equivalent of update <region> set value = for a specific property.
region.put(id, row)
} finally{
dls.unlock(some-key);
}