java通过FTP跨服务器动态监听读取指定目录下文件数据-CSDN博客

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

背景

1、文件数据在A服务器windows不定期在指定目录下生成项目应用部署在B服务器Linux
2、项目应用在B服务器监听A服务器指定目录有新生成文件进行读取文件信息持久化数据
3、提供两块内容第一安装windows FTP服务第二项目源码希望可以帮助到你。

共计4种方案试错采用了第三种方案第四种方案没有试。

1、使用jcsh.jar提供方法读取文件信息但需要A服务器开通SSH远程连接一般linux服务器都是默认开通的可直接读取连接读取windows系统需安装SSH因现场环境A服务器是windows2003故放弃这种方法。
2、曲线救国通过脚本脚本监听比较困难故放弃把A服务器信息定时存入B服务器Linux再通过jcsh.jar读取文件信息。
3、通过A服务器安装FTP服务B服务器安装FTP客户端使用java动态监听该目录下生成文件读取信息。
4、把A服务器指定目录进行共享等同于共享的这个目录就是B服务的目录了再进行读取因第三种方案成功故没有尝试第四种方案。

windows安装FTP服务

1、开启ftp服务控制面板–程序和功能–启用或关闭windows功能–标红框全部打开–点击确定
在这里插入图片描述
2、新建站点
控制面板–大图标–管理工具
在这里插入图片描述
IIS管理器
在这里插入图片描述
网站–添加FTP站点
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
以上就是windows安装FTP服务的过程我这演示了匿名创建站点谁都可以访问还可以新建用户需要用户登录才能访问。

源码

引入该依赖

<dependency>
   <groupId>commons-net</groupId>
    <artifactId>commons-net</artifactId>
    <version>3.6</version>
</dependency>

FileChangeData

@Data
public class FileChangeData {

    /**
     * 文件信息
     * */
    private FTPFile ftpFile;

    /**
     * 文件改变类型
     * */
    private FileChangeType eventType;

    /**
     * 文件名称
     * */
    private   String fileName;

    /**
     * 文件大小
     * */
    private Long fileSize;

    /**
     * FTPClient
     * */
    private FTPClient ftpClient;

    /**
     * 获取文件输入流
     * @return InputStream
     * */
    public InputStream getInputStream(String filePathName) {
        //如果是删除事件则不能够获取流
        if (Objects.equals(eventType, FileChangeType.FILE_DELETED)) {
            return null;
        }

        try {
            return ftpClient.retrieveFileStream(filePathName);
        } catch (IOException e) {
            return null;
        }
    }
}

FileChangeEvent

public interface FileChangeEvent {

    /**
     * 文件发生改变时触发此方法
     * @param fileChangeData 文件发生了改变
     * */
    @Function
    void change(FileChangeData fileChangeData) throws IOException;
}

FTPService

public interface FTPService {

    /**
     * ftp登陆
     * @return boolean 是否登陆成功
     * */
    FTPClient login();

    /**
     * ftp登出
     * @return boolean 是否登出成功
     * */
    boolean loginOut();

    /**
     * 获取文件列表
     * @return FTPFile[] 文件列表
     * */
    FTPFile[] listFile();

    /**
     * 监听文件夹的改变
     * @param fileChangeEvent 文件改变事件
     * */
    void addListenerFileChange(FileChangeEvent fileChangeEvent);
}

ListenerChangeRunnable

public interface ListenerChangeRunnable extends Runnable {

    /**
     * 停止监听文件
     * @return boolean 是否停止成功
     * */
    boolean stopListener();
}

FTPServiceImpl

@Service
public class FTPServiceImpl implements FTPService {

    @Autowired
    private FTPConfig ftpConfig;

    private String SPLIT = ":";

    private ThreadLocal<FTPClient> currentFTPClient;

    private ThreadLocal<ListenerChangeRunnable> currentListener;

    public FTPServiceImpl() {
        this.currentFTPClient = new ThreadLocal<>();
        this.currentListener = new ThreadLocal<>();
    }

    @Override
    public FTPClient login() {
        FTPClient ftpClient = new FTPClient();
        try {
            ftpClient.connect(ftpConfig.getFtpIp(), ftpConfig.getFtpPort());
            ftpClient.login(ftpConfig.getUsername(), ftpConfig.getPassword());
//            ftpClient.setControlEncoding("gb2312");
            this.currentFTPClient.set(ftpClient);
            return ftpClient;
        } catch (Exception e) {
            return null;
        }
    }

    @Override
    public boolean loginOut() {
        try {
            currentFTPClient.get().logout();
            currentFTPClient.get().disconnect();
            return Boolean.TRUE;
        } catch (Exception e) {
            return Boolean.FALSE;
        }
    }

    @Override
    public FTPFile[] listFile() {
        FTPClient ftpClient = this.currentFTPClient.get();
        try {
            return ftpClient.listFiles();
        } catch (Exception e) {
            return null;
        }
    }

    @Override
    public void addListenerFileChange(FileChangeEvent fileChangeEvent) {
        FTPClient ftpClient = this.currentFTPClient.get();
        ListenerFileChangeThreadRunnable listenerFileChangeThread = new ListenerFileChangeThreadRunnable(ftpClient, fileChangeEvent);
        this.currentListener.set(listenerFileChangeThread);
        new Thread(listenerFileChangeThread).start();
    }
}

ListenerFileChangeThreadRunnable

@Slf4j
public class ListenerFileChangeThreadRunnable implements ListenerChangeRunnable {

    private final FTPClient ftpClient;

    private volatile boolean stop;

    private final Map<String, Long> fileMemory;

    private final FileChangeEvent fileChangeEvent;

    public ListenerFileChangeThreadRunnable(FTPClient ftpClient, FileChangeEvent fileChangeEvent) {
        this.ftpClient = ftpClient;
        this.fileChangeEvent = fileChangeEvent;
        this.fileMemory = new HashMap<>();
    }

    @Override
    public void run() {
        while (!stop) {
            try {
                FTPFile[] ftpFiles = ftpClient.listFiles();

                //判断文件被删除
                if (fileMemory.size() > 0) {
                    Set<String> fileNames = new HashSet<>();
                    for (FTPFile ftpFile : ftpFiles) {
                        if (ftpFile.isDirectory()) {
                            log.info("文件夹不做删除判断");
                            continue;
                        }
                        fileNames.add(ftpFile.getName());
                    }
                    Set<Map.Entry<String, Long>> entries = fileMemory.entrySet();
                    for (Map.Entry<String, Long> map : entries) {
                        if (!fileNames.contains(map.getKey())) {
                            log.info("文件{}被删除了", map.getKey());
                            FileChangeData fileChangeData = new FileChangeData();
                            fileChangeData.setEventType(FileChangeType.FILE_DELETED);
                            fileChangeData.setFileName(map.getKey());
                            fileChangeData.setFileSize(map.getValue());
                            fileMemory.remove(map.getKey());
                            entries.remove(map.getKey());
                            fileChangeEvent.change(fileChangeData);
                        }
                    }
                }
                //判断文件是否有更改或新增
                for (FTPFile ftpFile: ftpFiles) {
                    //判断是否为文件夹
                    if (ftpFile.isDirectory()) {
//                        log.info("{}为文件不进行监听操作", ftpFile.getName());
                        continue;
                    }
                    FileChangeData fileChangeData = new FileChangeData();
                    fileChangeData.setFileName(ftpFile.getName());
//                    fileChangeData.setFileName("D:\\ftptest\\aaa\\"+ftpFile.getName());
                    fileChangeData.setFileSize(ftpFile.getSize());
                    fileChangeData.setFtpFile(ftpFile);
                    //文件是否存在于缓存文件列表中
                    if (fileMemory.containsKey(ftpFile.getName())) {
//                        log.info("文件{}在内存中已经存在进行大小判断", ftpFile.getName());
                        if (!Objects.equals(fileMemory.get(ftpFile.getName()), ftpFile.getSize())) {
//                            log.info("文件{}在内存中已经存在且大小不一致进行更新缓存操作", ftpFile.getName());
                            fileMemory.put(ftpFile.getName(), ftpFile.getSize());
                            fileChangeData.setEventType(FileChangeType.FILE_UPDATE);
                            fileChangeEvent.change(fileChangeData);
                        }
                        continue;
                    }
//                    log.info("文件{}在内存中不存在进行缓存操作", ftpFile.getName());
                    fileMemory.put(ftpFile.getName(), ftpFile.getSize());
                    fileChangeData.setEventType(FileChangeType.FILE_ADD);
                    fileChangeEvent.change(fileChangeData);
                }
            } catch (Exception e) {
                continue;
                //throw new RuntimeException(e);
            }
            try {
                TimeUnit.SECONDS.sleep(20);
            } catch (InterruptedException e) {
                continue;
                //throw new RuntimeException(e);
            }
        }
    }

    @Override
    public boolean stopListener() {
        this.stop = Boolean.TRUE;
        this.fileMemory.clear();
        return this.stop;
    }
}

FileChangeType

public enum FileChangeType {
    FILE_UPDATE(0, "文件更新"),
    FILE_ADD(1, "文件添加"),
    FILE_DELETED(2, "文件删除");

    @Getter
    private Integer type;

    @Getter
    private String desc;

    FileChangeType(Integer type, String desc) {
        this.type = type;
        this.desc = desc;
    }
}

FTPConfig

@Data
@Configuration
public class FTPConfig {

    @Value("${ftp.ip:ftp的ip}")
    private String ftpIp;

    @Value("${ftp.port:ftp端口默认21}")
    private Integer ftpPort;

    @Value("${ftp.username:ftp创建的用户名}")
    private String username;

    @Value("${ftp.password:ftp创建的用户名密码}")
    private String password;
}

SendEmailApplicationTests

@Component
class SendEmailApplicationTests implements ApplicationRunner {
    @Autowired
    private FTPService ftpService;
    void ftpTest() {
        FTPClient ftpClient = ftpService.login();
        ftpService.addListenerFileChange(ftpFile -> {
            System.out.println(String.format("文件%s被改变了文件改变类型%s", ftpFile.getFileName(), ftpFile.getEventType().getDesc()));
            InputStream inputStream = ftpClient.retrieveFileStream("/"+ ftpFile.getFileName());
            if(inputStream!=null){
                BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream,"GBK"));
                String s = null;
                List<String> listStr = new ArrayList<>();//读取的数据在listStr
                while ((s = reader.readLine()) != null) {
                    System.out.println("===================>" + s);
                    listStr.add(s);
                }
                //处理业务逻辑
                
                inputStream.close();
                reader.close();
                ftpClient.completePendingCommand();
            }
        });
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        ftpTest();
    }
}
阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: Java服务器