中文
注册

算子下推开发

  1. 使用注解@HafTarget标记需要下推的函数。

    如下所示,使用注解@HafTarget表示该函数为需要下推到卸载节点运算的函数。

    package com.haf.test.testapp;
    
    import com.huawei.haf.annotations.HafTarget;
    
    public class TestAppMain {
    
        private String ip;
    
        private int val;
    
        public TestAppMain(String ip) {
            val = 0;
            this.ip = ip;
        }
    
        @HafTarget(target = "127.0.0.1")
        public void jarOffloadVoidTest() {
            System.out.println("void test");
        }
    
        @HafTarget(target = "ip", to = {"a", "b"}, app = "add", exception = Exception.class)
        public int jarOffloadAddTest(int a, int b) {
            System.out.println("call add func");
            return a + b;
        }
    
        @HafTarget(target = "ip", to = {"a", "b"}, from = "val", timeout = "300", runInHostIfFailed = false, exception = Exception.class, app = "add")
        public int jarOffload(int a, int b) throws Exception {
            System.out.println("");
            val++;
            return a + b;
        }
    
        public static void main(String[] args) {
            String addIp;
            int numA = 0;
            int numB = 0;
            try {
                addIp = args[0];
                numA = Integer.valueOf(args[1]).intValue();
                numB = Integer.valueOf(args[2]).intValue();
            } catch (Exception e) {
                System.out.println(e.fillInStackTrace());
                return;
            }
            TestAppMain testAppMain = new TestAppMain(addIp);
            System.out.println(testAppMain.jarOffloadAddTest(numA, numB));
            testAppMain.jarOffloadVoidTest();
            try {
                testAppMain.jarOffload(numA, numB);
            } catch (Exception e) {
                System.out.println(e.fillInStackTrace());
                return;
            }
        }
    }
    表1 @HafTarget注解支持字段

    字段

    说明

    是否必填

    target

    表示卸载节点IP,函数将被下推到指定IP的节点上运行。

    target支持一个IP地址或一个类型为String的成员变量名,用户需保证该成员变量的值是一个合法IP地址。

    ×

    to

    表示下推函数的参数名。

    ×

    from

    表示函数运行结束之后,需要返回的变量名。

    ×

    timeout

    表示函数下推运行的超时时间,默认超时时间为1800s。timeout支持一个数字字符串或一个类型为int的成员变量名,用户需保证该成员变量的值是一个合法int整数。

    ×

    runInHostIfFailed

    表示如果下推失败,是否在主机节点运行该函数,默认为true,即如果函数下推失败,默认在主机节点运行函数。如果runInHostIfFailed的值设置为false,表示下推失败不在主机节点运行该函数,因此会抛出异常,需要使用exception指定抛出异常的类型。

    ×

    exception

    表示如果函数下推失败,抛出的异常类型,当runInHostIfFailed设为false时,表示需要抛出异常。

    app

    表示当前应用的名字。对于同一个JAR包项目,如果有多个函数需要使用@HafTarget注解标记下推,仅需有一个注解指定应用名。

    ×

  2. 配置pom文件,项目打包成一个整体的JAR包。
    如下所示,使用maven-assembly-plugin插件,将项目所有依赖打包在一起。
    <build>
    	<plugins>
    		<plugin>
    			<artifactId>maven-assembly-plugin</artifactId>
    			<version>3.3.0</version>
    			<configuration>
    				<archive>
    					<manifest>
    						<mainClass>com.haf.test.testapp.TestAppMain</mainClass>
    					</manifest>
    				</archive>
    				<descriptorRefs>
    					<descriptorRef>jar-with-dependencies</descriptorRef>
    				</descriptorRefs>
    			</configuration>
    			<executions>
    				<execution>
    					<id>make-assembly</id>
    					<phase>package</phase>
    					<goals>
    						<goal>single</goal>
    					</goals>
    				</execution>
    			</executions>
    		</plugin>
    	</plugins>
    </build>
  3. 开发序列化器。

    函数下推的过程中,是使用FST序列化当前对象this和to指定的参数,如果存在FST无法直接序列化的对象类型,需自行根据FST写序列化器,并使用注解@HafSerializer指定序列化器。

    如下所示,假设类Page是FST不支持的序列化对象。

    public class Page {
        private String name;
        private int age;
        public Page(String name, int age) {
            this.name = name;
            this.age = age;
        }
    
        public int getAge() {
            return age;
        }
    
        public String getName() {
            return name;
        }
    
        public void setAge(int age) {
            this.age = age;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        @Override
        public String toString() {
            return "Page{" +
                    "name='" + name + '\'' +
                    ", age=" + age +
                    '}';
        }
    }

    针对类Page创建FST的序列化器仅需继承FST的类FSTBasicObjectSerializer,并重写writeObject和instantiate方法。

    public class PageSerializer extends FSTBasicObjectSerializer {
    
        @Override
        public void writeObject(FSTObjectOutput out,
                                Object toWrite,
                                FSTClazzInfo clzInfo,
                                FSTClazzInfo.FSTFieldInfo referencedBy,
                                int streamPosition) throws IOException {
            if (toWrite instanceof Page) {
                Page page = (Page) toWrite;
                out.writeInt(page.getAge());
                out.writeStringUTF(page.getName());
            }
        }
    
        @Override
        public Object instantiate(Class objectClass,
                                  FSTObjectInput in,
                                  FSTClazzInfo serializationInfo,
                                  FSTClazzInfo.FSTFieldInfo referencee,
                                  int streamPosition) throws Exception {
            int age = in.readInt();
            String name = in.readStringUTF();
            return new Page(name, age);
        }
    }

    对于写好的序列化器,使用注解@HafSerializer使用该序列化器。如下所示:

    @HafSerializer(clazz = Page.class, serializer = PageSerializer.class)
    @HafTarget(target = "127.0.0.1")
    public Page getPage() {
    	Page page = new Page("test", 0);
    	return page;
    }

    注解@HafSerializer包含两个字段:

    • clazz表示需要使用序列化器的类。
    • serializer表示使用的自定义序列化器。
  4. 编译项目。
搜索结果
找到“0”个结果

当前产品无相关内容

未找到相关内容,请尝试其他搜索词